1 /*
2    Copyright (c) 2005, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "lgman.hpp"
26 #include "diskpage.hpp"
27 #include <signaldata/FsRef.hpp>
28 #include <signaldata/FsConf.hpp>
29 #include <signaldata/FsOpenReq.hpp>
30 #include <signaldata/FsCloseReq.hpp>
31 #include <signaldata/CreateFilegroupImpl.hpp>
32 #include <signaldata/DropFilegroupImpl.hpp>
33 #include <signaldata/FsReadWriteReq.hpp>
34 #include <signaldata/LCP.hpp>
35 #include <signaldata/SumaImpl.hpp>
36 #include <signaldata/LgmanContinueB.hpp>
37 #include <signaldata/GetTabInfo.hpp>
38 #include <signaldata/NodeFailRep.hpp>
39 #include <signaldata/DbinfoScan.hpp>
40 #include "dbtup/Dbtup.hpp"
41 
42 #include <EventLogger.hpp>
43 extern EventLogger * g_eventLogger;
44 
45 #include <record_types.hpp>
46 
47 /**
48  * ---<a>-----<b>-----<c>-----<d>---> (time)
49  *
50  * <a> = start of lcp 1
51  * <b> = stop of lcp 1
52  * <c> = start of lcp 2
53  * <d> = stop of lcp 2
54  *
55  * If ndb crashes before <d>
56  *   the entire undo log from crash point until <a> has to be applied
57  *
58  * at <d> the undo log can be cut til <c>
59  */
60 
61 #define DEBUG_UNDO_EXECUTION 0
62 #define DEBUG_SEARCH_LOG_HEAD 0
63 
64 #define FREE_BUFFER_MARGIN (2 * File_formats::UNDO_PAGE_WORDS)
65 
Lgman(Block_context & ctx)66 Lgman::Lgman(Block_context & ctx) :
67   SimulatedBlock(LGMAN, ctx),
68   m_tup(0),
69   m_logfile_group_list(m_logfile_group_pool),
70   m_logfile_group_hash(m_logfile_group_pool),
71   m_client_mutex("lgman-client", 2, true)
72 {
73   BLOCK_CONSTRUCTOR(Lgman);
74 
75   // Add received signals
76   addRecSignal(GSN_STTOR, &Lgman::execSTTOR);
77   addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
78   addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
79   addRecSignal(GSN_DBINFO_SCANREQ, &Lgman::execDBINFO_SCANREQ);
80   addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
81   addRecSignal(GSN_NODE_FAILREP, &Lgman::execNODE_FAILREP);
82 
83   addRecSignal(GSN_CREATE_FILE_IMPL_REQ, &Lgman::execCREATE_FILE_IMPL_REQ);
84   addRecSignal(GSN_CREATE_FILEGROUP_IMPL_REQ,
85                &Lgman::execCREATE_FILEGROUP_IMPL_REQ);
86 
87   addRecSignal(GSN_DROP_FILE_IMPL_REQ, &Lgman::execDROP_FILE_IMPL_REQ);
88   addRecSignal(GSN_DROP_FILEGROUP_IMPL_REQ,
89                &Lgman::execDROP_FILEGROUP_IMPL_REQ);
90 
91   addRecSignal(GSN_FSWRITEREQ, &Lgman::execFSWRITEREQ);
92   addRecSignal(GSN_FSWRITEREF, &Lgman::execFSWRITEREF, true);
93   addRecSignal(GSN_FSWRITECONF, &Lgman::execFSWRITECONF);
94 
95   addRecSignal(GSN_FSOPENREF, &Lgman::execFSOPENREF, true);
96   addRecSignal(GSN_FSOPENCONF, &Lgman::execFSOPENCONF);
97 
98   addRecSignal(GSN_FSCLOSECONF, &Lgman::execFSCLOSECONF);
99 
100   addRecSignal(GSN_FSREADREF, &Lgman::execFSREADREF, true);
101   addRecSignal(GSN_FSREADCONF, &Lgman::execFSREADCONF);
102 
103   addRecSignal(GSN_LCP_FRAG_ORD, &Lgman::execLCP_FRAG_ORD);
104   addRecSignal(GSN_END_LCP_REQ, &Lgman::execEND_LCP_REQ);
105   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP);
106   addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ);
107 
108   addRecSignal(GSN_END_LCP_CONF, &Lgman::execEND_LCP_CONF);
109 
110   addRecSignal(GSN_GET_TABINFOREQ, &Lgman::execGET_TABINFOREQ);
111 
112   m_last_lsn = 1;
113   m_logfile_group_hash.setSize(10);
114 
115   if (isNdbMtLqh()) {
116     jam();
117     int ret = m_client_mutex.create();
118     ndbrequire(ret == 0);
119   }
120 
121   {
122     CallbackEntry& ce = m_callbackEntry[THE_NULL_CALLBACK];
123     ce.m_function = TheNULLCallback.m_callbackFunction;
124     ce.m_flags = 0;
125   }
126   {
127     CallbackEntry& ce = m_callbackEntry[ENDLCP_CALLBACK];
128     ce.m_function = safe_cast(&Lgman::endlcp_callback);
129     ce.m_flags = 0;
130   }
131   {
132     CallbackTable& ct = m_callbackTable;
133     ct.m_count = COUNT_CALLBACKS;
134     ct.m_entry = m_callbackEntry;
135     m_callbackTableAddr = &ct;
136   }
137 }
138 
~Lgman()139 Lgman::~Lgman()
140 {
141   if (isNdbMtLqh()) {
142     (void)m_client_mutex.destroy();
143   }
144 }
145 
146 void
client_lock(BlockNumber block,int line)147 Lgman::client_lock(BlockNumber block, int line)
148 {
149   if (isNdbMtLqh()) {
150 #ifdef VM_TRACE
151     Uint32 bno = blockToMain(block);
152     Uint32 ino = blockToInstance(block);
153 #endif
154     D("try lock " << bno << "/" << ino << V(line));
155     int ret = m_client_mutex.lock();
156     ndbrequire(ret == 0);
157     D("got lock " << bno << "/" << ino << V(line));
158   }
159 }
160 
161 void
client_unlock(BlockNumber block,int line)162 Lgman::client_unlock(BlockNumber block, int line)
163 {
164   if (isNdbMtLqh()) {
165 #ifdef VM_TRACE
166     Uint32 bno = blockToMain(block);
167     Uint32 ino = blockToInstance(block);
168 #endif
169     D("unlock " << bno << "/" << ino << V(line));
170     int ret = m_client_mutex.unlock();
171     ndbrequire(ret == 0);
172   }
173 }
174 
BLOCK_FUNCTIONS(Lgman)175 BLOCK_FUNCTIONS(Lgman)
176 
177 void
178 Lgman::execREAD_CONFIG_REQ(Signal* signal)
179 {
180   jamEntry();
181 
182   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
183 
184   Uint32 ref = req->senderRef;
185   Uint32 senderData = req->senderData;
186 
187   const ndb_mgm_configuration_iterator * p =
188     m_ctx.m_config.getOwnConfigIterator();
189   ndbrequire(p != 0);
190 
191   Pool_context pc;
192   pc.m_block = this;
193   m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc);
194   m_file_pool.init(RT_LGMAN_FILE, pc);
195   m_logfile_group_pool.init(RT_LGMAN_FILEGROUP, pc);
196   // 10 -> 150M
197   m_data_buffer_pool.setSize(40);
198 
199   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
200   conf->senderRef = reference();
201   conf->senderData = senderData;
202   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
203 	     ReadConfigConf::SignalLength, JBB);
204 }
205 
206 void
execSTTOR(Signal * signal)207 Lgman::execSTTOR(Signal* signal)
208 {
209   jamEntry();
210   Uint32 startPhase = signal->theData[1];
211   switch (startPhase) {
212   case 1:
213     m_tup = globalData.getBlock(DBTUP);
214     ndbrequire(m_tup != 0);
215     break;
216   }
217   sendSTTORRY(signal);
218 }
219 
220 void
sendSTTORRY(Signal * signal)221 Lgman::sendSTTORRY(Signal* signal)
222 {
223   signal->theData[0] = 0;
224   signal->theData[3] = 1;
225   signal->theData[4] = 2;
226   signal->theData[5] = 3;
227   signal->theData[6] = 4;
228   signal->theData[7] = 5;
229   signal->theData[8] = 6;
230   signal->theData[9] = 255; // No more start phases from missra
231   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
232 }
233 
234 void
execCONTINUEB(Signal * signal)235 Lgman::execCONTINUEB(Signal* signal){
236   jamEntry();
237 
238   Uint32 type= signal->theData[0];
239   Uint32 ptrI = signal->theData[1];
240   client_lock(number(), __LINE__);
241   switch(type){
242   case LgmanContinueB::FILTER_LOG:
243     jam();
244     break;
245   case LgmanContinueB::CUT_LOG_TAIL:
246   {
247     jam();
248     Ptr<Logfile_group> ptr;
249     m_logfile_group_pool.getPtr(ptr, ptrI);
250     cut_log_tail(signal, ptr);
251     break;
252   }
253   case LgmanContinueB::FLUSH_LOG:
254   {
255     jam();
256     Ptr<Logfile_group> ptr;
257     m_logfile_group_pool.getPtr(ptr, ptrI);
258     flush_log(signal, ptr, signal->theData[2]);
259     break;
260   }
261   case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
262   {
263     jam();
264     Ptr<Logfile_group> ptr;
265     m_logfile_group_pool.getPtr(ptr, ptrI);
266     process_log_buffer_waiters(signal, ptr);
267     break;
268   }
269   case LgmanContinueB::FIND_LOG_HEAD:
270     jam();
271     Ptr<Logfile_group> ptr;
272     if(ptrI != RNIL)
273     {
274       jam();
275       m_logfile_group_pool.getPtr(ptr, ptrI);
276       find_log_head(signal, ptr);
277     }
278     else
279     {
280       jam();
281       init_run_undo_log(signal);
282     }
283     break;
284   case LgmanContinueB::EXECUTE_UNDO_RECORD:
285     jam();
286     execute_undo_record(signal);
287     break;
288   case LgmanContinueB::STOP_UNDO_LOG:
289     jam();
290     stop_run_undo_log(signal);
291     break;
292   case LgmanContinueB::READ_UNDO_LOG:
293   {
294     jam();
295     Ptr<Logfile_group> ptr;
296     m_logfile_group_pool.getPtr(ptr, ptrI);
297     read_undo_log(signal, ptr);
298     break;
299   }
300   case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
301   {
302     jam();
303     Ptr<Logfile_group> ptr;
304     m_logfile_group_pool.getPtr(ptr, ptrI);
305     process_log_sync_waiters(signal, ptr);
306     break;
307   }
308   case LgmanContinueB::FORCE_LOG_SYNC:
309   {
310     jam();
311     Ptr<Logfile_group> ptr;
312     m_logfile_group_pool.getPtr(ptr, ptrI);
313     force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
314     break;
315   }
316   case LgmanContinueB::DROP_FILEGROUP:
317   {
318     jam();
319     Ptr<Logfile_group> ptr;
320     m_logfile_group_pool.getPtr(ptr, ptrI);
321     if ((ptr.p->m_state & Logfile_group::LG_THREAD_MASK) ||
322         ptr.p->m_outstanding_fs > 0)
323     {
324       jam();
325       sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
326 			  signal->length());
327       break;
328     }
329     Uint32 ref = signal->theData[2];
330     Uint32 data = signal->theData[3];
331     drop_filegroup_drop_files(signal, ptr, ref, data);
332     break;
333   }
334   }
335   client_unlock(number(), __LINE__);
336 }
337 
338 void
execNODE_FAILREP(Signal * signal)339 Lgman::execNODE_FAILREP(Signal* signal)
340 {
341   jamEntry();
342   const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
343   NdbNodeBitmask failed;
344   failed.assign(NdbNodeBitmask::Size, rep->theNodes);
345 
346   /* Block level cleanup */
347   for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
348     jam();
349     if(failed.get(i)) {
350       jam();
351       Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
352       ndbassert(elementsCleaned == 0); // No distributed fragmented signals
353       (void) elementsCleaned; // Remove compiler warning
354     }//if
355   }//for
356 }
357 
358 void
execDUMP_STATE_ORD(Signal * signal)359 Lgman::execDUMP_STATE_ORD(Signal* signal){
360   jamEntry();
361   if (signal->theData[0] == 12001 || signal->theData[0] == 12002)
362   {
363     char tmp[1024];
364     Ptr<Logfile_group> ptr;
365     m_logfile_group_list.first(ptr);
366     while(!ptr.isNull())
367     {
368       BaseString::snprintf(tmp, sizeof(tmp),
369                            "lfg %u state: %x fs: %u lsn "
370                            " [ last: %llu s(req): %llu s:ed: %llu lcp: %llu ] "
371                            " waiters: %d %d",
372                            ptr.p->m_logfile_group_id, ptr.p->m_state,
373                            ptr.p->m_outstanding_fs,
374                            ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
375                            ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
376                            !ptr.p->m_log_buffer_waiters.isEmpty(),
377                            !ptr.p->m_log_sync_waiters.isEmpty());
378       if (signal->theData[0] == 12001)
379         infoEvent("%s", tmp);
380       ndbout_c("%s", tmp);
381 
382       BaseString::snprintf(tmp, sizeof(tmp),
383                            "   callback_buffer_words: %u"
384                            " free_buffer_words: %u free_file_words: %llu",
385                            ptr.p->m_callback_buffer_words,
386                            ptr.p->m_free_buffer_words,
387                            ptr.p->m_free_file_words);
388       if (signal->theData[0] == 12001)
389         infoEvent("%s", tmp);
390       ndbout_c("%s", tmp);
391       if (!ptr.p->m_log_buffer_waiters.isEmpty())
392       {
393 	Ptr<Log_waiter> waiter;
394 	Local_log_waiter_list
395 	  list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
396 	list.first(waiter);
397         BaseString::snprintf(tmp, sizeof(tmp),
398                              "  head(waiters).sz: %u %u",
399                              waiter.p->m_size,
400                              FREE_BUFFER_MARGIN);
401         if (signal->theData[0] == 12001)
402           infoEvent("%s", tmp);
403         ndbout_c("%s", tmp);
404       }
405       if (!ptr.p->m_log_sync_waiters.isEmpty())
406       {
407 	Ptr<Log_waiter> waiter;
408 	Local_log_waiter_list
409 	  list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
410 	list.first(waiter);
411         BaseString::snprintf(tmp, sizeof(tmp),
412                              "  m_last_synced_lsn: %llu head(waiters %x).m_sync_lsn: %llu",
413                              ptr.p->m_last_synced_lsn,
414                              waiter.i,
415                              waiter.p->m_sync_lsn);
416         if (signal->theData[0] == 12001)
417           infoEvent("%s", tmp);
418         ndbout_c("%s", tmp);
419 
420 	while(!waiter.isNull())
421 	{
422 	  ndbout_c("ptr: %x %p lsn: %llu next: %x",
423 		   waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList);
424 	  list.next(waiter);
425 	}
426       }
427       m_logfile_group_list.next(ptr);
428     }
429   }
430   if (signal->theData[0] == 12003)
431   {
432     bool crash = false;
433     Ptr<Logfile_group> ptr;
434     for (m_logfile_group_list.first(ptr); !ptr.isNull();
435          m_logfile_group_list.next(ptr))
436     {
437       if (ptr.p->m_callback_buffer_words != 0)
438       {
439         crash = true;
440         break;
441       }
442     }
443 
444     if (crash)
445     {
446       ndbout_c("Detected logfile-group with non zero m_callback_buffer_words");
447       signal->theData[0] = 12002;
448       execDUMP_STATE_ORD(signal);
449       ndbrequire(false);
450     }
451 #ifdef VM_TRACE
452     else
453     {
454       ndbout_c("Check for non zero m_callback_buffer_words OK!");
455     }
456 #endif
457   }
458 }
459 
460 void
execDBINFO_SCANREQ(Signal * signal)461 Lgman::execDBINFO_SCANREQ(Signal *signal)
462 {
463   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
464   const Ndbinfo::ScanCursor* cursor =
465     CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
466   Ndbinfo::Ratelimit rl;
467 
468   jamEntry();
469 
470   switch(req.tableId) {
471   case Ndbinfo::LOGSPACES_TABLEID:
472   {
473     jam();
474     Uint32 startBucket = cursor->data[0];
475     Logfile_group_hash_iterator iter;
476     m_logfile_group_hash.next(startBucket, iter);
477 
478     while (!iter.curr.isNull())
479     {
480       jam();
481 
482       Uint32 currentBucket = iter.bucket;
483       Ptr<Logfile_group> ptr = iter.curr;
484 
485       Uint64 free = ptr.p->m_free_file_words*4;
486 
487       Uint64 total = 0;
488       Local_undofile_list list(m_file_pool, ptr.p->m_files);
489       Ptr<Undofile> filePtr;
490       for (list.first(filePtr); !filePtr.isNull(); list.next(filePtr))
491       {
492         jam();
493         total += (Uint64)filePtr.p->m_file_size *
494           (Uint64)File_formats::NDB_PAGE_SIZE;
495       }
496 
497       Uint64 high = 0; // TODO
498 
499       Ndbinfo::Row row(signal, req);
500       row.write_uint32(getOwnNodeId());
501       row.write_uint32(1); // log type, 1 = DD-UNDO
502       row.write_uint32(ptr.p->m_logfile_group_id); // log id
503       row.write_uint32(0); // log part
504 
505       row.write_uint64(total);          // total allocated
506       row.write_uint64((total-free));   // currently in use
507       row.write_uint64(high);           // in use high water mark
508       ndbinfo_send_row(signal, req, row, rl);
509 
510       // move to next
511       if (m_logfile_group_hash.next(iter) == false)
512       {
513         jam(); // no more...
514         break;
515       }
516       else if (iter.bucket == currentBucket)
517       {
518         jam();
519         continue; // we need to iterate an entire bucket
520       }
521       else if (rl.need_break(req))
522       {
523         jam();
524         ndbinfo_send_scan_break(signal, req, rl, iter.bucket);
525         return;
526       }
527     }
528     break;
529   }
530 
531   case Ndbinfo::LOGBUFFERS_TABLEID:
532   {
533     jam();
534     Uint32 startBucket = cursor->data[0];
535     Logfile_group_hash_iterator iter;
536     m_logfile_group_hash.next(startBucket, iter);
537 
538     while (!iter.curr.isNull())
539     {
540       jam();
541 
542       Uint32 currentBucket = iter.bucket;
543       Ptr<Logfile_group> ptr = iter.curr;
544 
545       Uint64 free = ptr.p->m_free_buffer_words*4;
546       Uint64 total = ptr.p->m_total_buffer_words*4;
547       Uint64 high = 0; // TODO
548 
549       Ndbinfo::Row row(signal, req);
550       row.write_uint32(getOwnNodeId());
551       row.write_uint32(1); // log type, 1 = DD-UNDO
552       row.write_uint32(ptr.p->m_logfile_group_id); // log id
553       row.write_uint32(0); // log part
554 
555       row.write_uint64(total);          // total allocated
556       row.write_uint64((total-free));   // currently in use
557       row.write_uint64(high);           // in use high water mark
558       ndbinfo_send_row(signal, req, row, rl);
559 
560       // move to next
561       if (m_logfile_group_hash.next(iter) == false)
562       {
563         jam(); // no more...
564         break;
565       }
566       else if (iter.bucket == currentBucket)
567       {
568         jam();
569         continue; // we need to iterate an entire bucket
570       }
571       else if (rl.need_break(req))
572       {
573         jam();
574         ndbinfo_send_scan_break(signal, req, rl, iter.bucket);
575         return;
576       }
577     }
578     break;
579   }
580 
581   default:
582     break;
583   }
584 
585   ndbinfo_send_scan_conf(signal, req, rl);
586 }
587 
588 void
execCREATE_FILEGROUP_IMPL_REQ(Signal * signal)589 Lgman::execCREATE_FILEGROUP_IMPL_REQ(Signal* signal){
590   jamEntry();
591   CreateFilegroupImplReq* req= (CreateFilegroupImplReq*)signal->getDataPtr();
592 
593   Uint32 senderRef = req->senderRef;
594   Uint32 senderData = req->senderData;
595 
596   Ptr<Logfile_group> ptr;
597   CreateFilegroupImplRef::ErrorCode err = CreateFilegroupImplRef::NoError;
598   do {
599     if (m_logfile_group_hash.find(ptr, req->filegroup_id))
600     {
601       jam();
602       err = CreateFilegroupImplRef::FilegroupAlreadyExists;
603       break;
604     }
605 
606     if (!m_logfile_group_list.isEmpty())
607     {
608       jam();
609       err = CreateFilegroupImplRef::OneLogfileGroupLimit;
610       break;
611     }
612 
613     if (!m_logfile_group_pool.seize(ptr))
614     {
615       jam();
616       err = CreateFilegroupImplRef::OutOfFilegroupRecords;
617       break;
618     }
619 
620     new (ptr.p) Logfile_group(req);
621 
622     if (!alloc_logbuffer_memory(ptr, req->logfile_group.buffer_size))
623     {
624       jam();
625       err= CreateFilegroupImplRef::OutOfLogBufferMemory;
626       m_logfile_group_pool.release(ptr);
627       break;
628     }
629 
630     m_logfile_group_hash.add(ptr);
631     m_logfile_group_list.add(ptr);
632 
633     if ((getNodeState().getNodeRestartInProgress() &&
634          getNodeState().starting.restartType !=
635          NodeState::ST_INITIAL_NODE_RESTART)||
636         getNodeState().getSystemRestartInProgress())
637     {
638       ptr.p->m_state = Logfile_group::LG_STARTING;
639     }
640 
641     CreateFilegroupImplConf* conf=
642       (CreateFilegroupImplConf*)signal->getDataPtr();
643     conf->senderData = senderData;
644     conf->senderRef = reference();
645     sendSignal(senderRef, GSN_CREATE_FILEGROUP_IMPL_CONF, signal,
646 	       CreateFilegroupImplConf::SignalLength, JBB);
647 
648     return;
649   } while(0);
650 
651   CreateFilegroupImplRef* ref= (CreateFilegroupImplRef*)signal->getDataPtr();
652   ref->senderData = senderData;
653   ref->senderRef = reference();
654   ref->errorCode = err;
655   sendSignal(senderRef, GSN_CREATE_FILEGROUP_IMPL_REF, signal,
656 	     CreateFilegroupImplRef::SignalLength, JBB);
657 }
658 
659 void
execDROP_FILEGROUP_IMPL_REQ(Signal * signal)660 Lgman::execDROP_FILEGROUP_IMPL_REQ(Signal* signal)
661 {
662   jamEntry();
663 
664   Uint32 errorCode = 0;
665   DropFilegroupImplReq req = *(DropFilegroupImplReq*)signal->getDataPtr();
666   do
667   {
668     Ptr<Logfile_group> ptr;
669     if (!m_logfile_group_hash.find(ptr, req.filegroup_id))
670     {
671       errorCode = DropFilegroupImplRef::NoSuchFilegroup;
672       break;
673     }
674 
675     if (ptr.p->m_version != req.filegroup_version)
676     {
677       errorCode = DropFilegroupImplRef::InvalidFilegroupVersion;
678       break;
679     }
680 
681     switch(req.requestInfo){
682     case DropFilegroupImplReq::Prepare:
683       break;
684     case DropFilegroupImplReq::Commit:
685       m_logfile_group_list.remove(ptr);
686       ptr.p->m_state |= Logfile_group::LG_DROPPING;
687       signal->theData[0] = LgmanContinueB::DROP_FILEGROUP;
688       signal->theData[1] = ptr.i;
689       signal->theData[2] = req.senderRef;
690       signal->theData[3] = req.senderData;
691       sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
692       return;
693     case DropFilegroupImplReq::Abort:
694       break;
695     default:
696       ndbrequire(false);
697     }
698   } while(0);
699 
700   if (errorCode)
701   {
702     DropFilegroupImplRef* ref =
703       (DropFilegroupImplRef*)signal->getDataPtrSend();
704     ref->senderRef = reference();
705     ref->senderData = req.senderData;
706     ref->errorCode = errorCode;
707     sendSignal(req.senderRef, GSN_DROP_FILEGROUP_IMPL_REF, signal,
708 	       DropFilegroupImplRef::SignalLength, JBB);
709   }
710   else
711   {
712     DropFilegroupImplConf* conf =
713       (DropFilegroupImplConf*)signal->getDataPtrSend();
714     conf->senderRef = reference();
715     conf->senderData = req.senderData;
716     sendSignal(req.senderRef, GSN_DROP_FILEGROUP_IMPL_CONF, signal,
717 	       DropFilegroupImplConf::SignalLength, JBB);
718   }
719 }
720 
721 void
drop_filegroup_drop_files(Signal * signal,Ptr<Logfile_group> ptr,Uint32 ref,Uint32 data)722 Lgman::drop_filegroup_drop_files(Signal* signal,
723 				 Ptr<Logfile_group> ptr,
724 				 Uint32 ref, Uint32 data)
725 {
726   jam();
727   ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
728   ndbrequire(ptr.p->m_outstanding_fs == 0);
729 
730   Local_undofile_list list(m_file_pool, ptr.p->m_files);
731   Ptr<Undofile> file_ptr;
732 
733   if (list.first(file_ptr))
734   {
735     jam();
736     ndbrequire(! (file_ptr.p->m_state & Undofile::FS_OUTSTANDING));
737     file_ptr.p->m_create.m_senderRef = ref;
738     file_ptr.p->m_create.m_senderData = data;
739     create_file_abort(signal, ptr, file_ptr);
740     return;
741   }
742 
743   Local_undofile_list metalist(m_file_pool, ptr.p->m_meta_files);
744   if (metalist.first(file_ptr))
745   {
746     jam();
747     metalist.remove(file_ptr);
748     list.add(file_ptr);
749     file_ptr.p->m_create.m_senderRef = ref;
750     file_ptr.p->m_create.m_senderData = data;
751     create_file_abort(signal, ptr, file_ptr);
752     return;
753   }
754 
755   free_logbuffer_memory(ptr);
756   m_logfile_group_hash.release(ptr);
757   DropFilegroupImplConf *conf = (DropFilegroupImplConf*)signal->getDataPtr();
758   conf->senderData = data;
759   conf->senderRef = reference();
760   sendSignal(ref, GSN_DROP_FILEGROUP_IMPL_CONF, signal,
761 	     DropFilegroupImplConf::SignalLength, JBB);
762 }
763 
764 void
execCREATE_FILE_IMPL_REQ(Signal * signal)765 Lgman::execCREATE_FILE_IMPL_REQ(Signal* signal)
766 {
767   jamEntry();
768   CreateFileImplReq* req= (CreateFileImplReq*)signal->getDataPtr();
769 
770   Uint32 senderRef = req->senderRef;
771   Uint32 senderData = req->senderData;
772   Uint32 requestInfo = req->requestInfo;
773 
774   Ptr<Logfile_group> ptr;
775   CreateFileImplRef::ErrorCode err = CreateFileImplRef::NoError;
776   SectionHandle handle(this, signal);
777   do {
778     if (!m_logfile_group_hash.find(ptr, req->filegroup_id))
779     {
780       jam();
781       err = CreateFileImplRef::InvalidFilegroup;
782       break;
783     }
784 
785     if (ptr.p->m_version != req->filegroup_version)
786     {
787       jam();
788       err = CreateFileImplRef::InvalidFilegroupVersion;
789       break;
790     }
791 
792     Ptr<Undofile> file_ptr;
793     switch(requestInfo){
794     case CreateFileImplReq::Commit:
795     {
796       jam();
797       ndbrequire(find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id));
798       file_ptr.p->m_create.m_senderRef = req->senderRef;
799       file_ptr.p->m_create.m_senderData = req->senderData;
800       create_file_commit(signal, ptr, file_ptr);
801       return;
802     }
803     case CreateFileImplReq::Abort:
804     {
805       Uint32 senderRef = req->senderRef;
806       Uint32 senderData = req->senderData;
807       if (find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id))
808       {
809         jam();
810 	file_ptr.p->m_create.m_senderRef = senderRef;
811 	file_ptr.p->m_create.m_senderData = senderData;
812 	create_file_abort(signal, ptr, file_ptr);
813       }
814       else
815       {
816 	CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
817         jam();
818 	conf->senderData = senderData;
819 	conf->senderRef = reference();
820 	sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
821 		   CreateFileImplConf::SignalLength, JBB);
822       }
823       return;
824     }
825     default: // prepare
826       break;
827     }
828 
829     if (!m_file_pool.seize(file_ptr))
830     {
831       jam();
832       err = CreateFileImplRef::OutOfFileRecords;
833       break;
834     }
835 
836     if (!handle.m_cnt == 1)
837     {
838       ndbrequire(false);
839     }
840 
841     if (ERROR_INSERTED(15000) ||
842         (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
843     {
844       jam();
845       err = CreateFileImplRef::FileSizeTooLarge;
846       break;
847     }
848 
849     Uint64 sz = (Uint64(req->file_size_hi) << 32) + req->file_size_lo;
850     if (sz < 1024*1024)
851     {
852       jam();
853       err = CreateFileImplRef::FileSizeTooSmall;
854       break;
855     }
856 
857     new (file_ptr.p) Undofile(req, ptr.i);
858 
859     Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files);
860     tmp.add(file_ptr);
861 
862     open_file(signal, file_ptr, req->requestInfo, &handle);
863     return;
864   } while(0);
865 
866   releaseSections(handle);
867   CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
868   ref->senderData = senderData;
869   ref->senderRef = reference();
870   ref->errorCode = err;
871   sendSignal(senderRef, GSN_CREATE_FILE_IMPL_REF, signal,
872 	     CreateFileImplRef::SignalLength, JBB);
873 }
874 
875 void
open_file(Signal * signal,Ptr<Undofile> ptr,Uint32 requestInfo,SectionHandle * handle)876 Lgman::open_file(Signal* signal, Ptr<Undofile> ptr,
877 		 Uint32 requestInfo,
878 		 SectionHandle * handle)
879 {
880   FsOpenReq* req = (FsOpenReq*)signal->getDataPtrSend();
881   req->userReference = reference();
882   req->userPointer = ptr.i;
883 
884   memset(req->fileNumber, 0, sizeof(req->fileNumber));
885   FsOpenReq::setVersion(req->fileNumber, 4); // Version 4 = specified filename
886   FsOpenReq::v4_setBasePath(req->fileNumber, FsOpenReq::BP_DD_UF);
887 
888   req->fileFlags = 0;
889   req->fileFlags |= FsOpenReq::OM_READWRITE;
890   req->fileFlags |= FsOpenReq::OM_DIRECT;
891   req->fileFlags |= FsOpenReq::OM_SYNC;
892   switch(requestInfo){
893   case CreateFileImplReq::Create:
894     req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
895     req->fileFlags |= FsOpenReq::OM_INIT;
896     ptr.p->m_state = Undofile::FS_CREATING;
897     break;
898   case CreateFileImplReq::CreateForce:
899     req->fileFlags |= FsOpenReq::OM_CREATE;
900     req->fileFlags |= FsOpenReq::OM_INIT;
901     ptr.p->m_state = Undofile::FS_CREATING;
902     break;
903   case CreateFileImplReq::Open:
904     req->fileFlags |= FsOpenReq::OM_CHECK_SIZE;
905     ptr.p->m_state = Undofile::FS_OPENING;
906     break;
907   default:
908     ndbrequire(false);
909   }
910 
911   req->page_size = File_formats::NDB_PAGE_SIZE;
912   Uint64 size = (Uint64)ptr.p->m_file_size * (Uint64)File_formats::NDB_PAGE_SIZE;
913   req->file_size_hi = (Uint32)(size >> 32);
914   req->file_size_lo = (Uint32)(size & 0xFFFFFFFF);
915 
916   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBB,
917 	     handle);
918 }
919 
920 void
execFSWRITEREQ(Signal * signal)921 Lgman::execFSWRITEREQ(Signal* signal)
922 {
923   jamEntry();
924   Ptr<Undofile> ptr;
925   Ptr<GlobalPage> page_ptr;
926   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
927 
928   m_file_pool.getPtr(ptr, req->userPointer);
929   m_shared_page_pool.getPtr(page_ptr, req->data.pageData[0]);
930 
931   if (req->varIndex == 0)
932   {
933     File_formats::Undofile::Zero_page* page =
934       (File_formats::Undofile::Zero_page*)page_ptr.p;
935     page->m_page_header.init(File_formats::FT_Undofile,
936 			     getOwnNodeId(),
937 			     ndbGetOwnVersion(),
938 			     (Uint32)time(0));
939     page->m_file_id = ptr.p->m_file_id;
940     page->m_logfile_group_id = ptr.p->m_create.m_logfile_group_id;
941     page->m_logfile_group_version = ptr.p->m_create.m_logfile_group_version;
942     page->m_undo_pages = ptr.p->m_file_size - 1; // minus zero page
943   }
944   else
945   {
946     File_formats::Undofile::Undo_page* page =
947       (File_formats::Undofile::Undo_page*)page_ptr.p;
948     page->m_page_header.m_page_lsn_hi = 0;
949     page->m_page_header.m_page_lsn_lo = 0;
950     page->m_page_header.m_page_type = File_formats::PT_Undopage;
951     page->m_words_used = 0;
952   }
953 }
954 
955 void
execFSOPENREF(Signal * signal)956 Lgman::execFSOPENREF(Signal* signal)
957 {
958   jamEntry();
959 
960   Ptr<Undofile> ptr;
961   Ptr<Logfile_group> lg_ptr;
962   FsRef* ref = (FsRef*)signal->getDataPtr();
963 
964   Uint32 errCode = ref->errorCode;
965   Uint32 osErrCode = ref->osErrorCode;
966 
967   m_file_pool.getPtr(ptr, ref->userPointer);
968   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
969 
970   {
971     CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
972     ref->senderData = ptr.p->m_create.m_senderData;
973     ref->senderRef = reference();
974     ref->errorCode = CreateFileImplRef::FileError;
975     ref->fsErrCode = errCode;
976     ref->osErrCode = osErrCode;
977 
978     sendSignal(ptr.p->m_create.m_senderRef, GSN_CREATE_FILE_IMPL_REF, signal,
979 	       CreateFileImplRef::SignalLength, JBB);
980   }
981 
982   Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
983   meta.release(ptr);
984 }
985 
986 #define HEAD 0
987 #define TAIL 1
988 
989 void
execFSOPENCONF(Signal * signal)990 Lgman::execFSOPENCONF(Signal* signal)
991 {
992   jamEntry();
993   Ptr<Undofile> ptr;
994 
995   FsConf* conf = (FsConf*)signal->getDataPtr();
996 
997   Uint32 fd = conf->filePointer;
998   m_file_pool.getPtr(ptr, conf->userPointer);
999 
1000   ptr.p->m_fd = fd;
1001 
1002   {
1003     Uint32 senderRef = ptr.p->m_create.m_senderRef;
1004     Uint32 senderData = ptr.p->m_create.m_senderData;
1005 
1006     CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1007     conf->senderData = senderData;
1008     conf->senderRef = reference();
1009     sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1010 	       CreateFileImplConf::SignalLength, JBB);
1011   }
1012 }
1013 
1014 bool
find_file_by_id(Ptr<Undofile> & ptr,Local_undofile_list::Head & head,Uint32 id)1015 Lgman::find_file_by_id(Ptr<Undofile>& ptr,
1016 		       Local_undofile_list::Head& head, Uint32 id)
1017 {
1018   Local_undofile_list list(m_file_pool, head);
1019   for(list.first(ptr); !ptr.isNull(); list.next(ptr))
1020     if(ptr.p->m_file_id == id)
1021       return true;
1022   return false;
1023 }
1024 
1025 void
create_file_commit(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)1026 Lgman::create_file_commit(Signal* signal,
1027 			  Ptr<Logfile_group> lg_ptr,
1028 			  Ptr<Undofile> ptr)
1029 {
1030   Uint32 senderRef = ptr.p->m_create.m_senderRef;
1031   Uint32 senderData = ptr.p->m_create.m_senderData;
1032 
1033   bool first= false;
1034   if(ptr.p->m_state == Undofile::FS_CREATING &&
1035      (lg_ptr.p->m_state & Logfile_group::LG_ONLINE))
1036   {
1037     jam();
1038     Local_undofile_list free(m_file_pool, lg_ptr.p->m_files);
1039     Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
1040     first= free.isEmpty();
1041     meta.remove(ptr);
1042     if(!first)
1043     {
1044       /**
1045        * Add log file next after current head
1046        */
1047       Ptr<Undofile> curr;
1048       m_file_pool.getPtr(curr, lg_ptr.p->m_file_pos[HEAD].m_ptr_i);
1049       if(free.next(curr))
1050 	free.insert(ptr, curr); // inserts before (that's why the extra next)
1051       else
1052 	free.add(ptr);
1053 
1054       ptr.p->m_state = Undofile::FS_ONLINE | Undofile::FS_EMPTY;
1055     }
1056     else
1057     {
1058       /**
1059        * First file isn't empty as it can be written to at any time
1060        */
1061       free.add(ptr);
1062       ptr.p->m_state = Undofile::FS_ONLINE;
1063       lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
1064       signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1065       signal->theData[1] = lg_ptr.i;
1066       signal->theData[2] = 0;
1067       sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1068     }
1069   }
1070   else
1071   {
1072     ptr.p->m_state = Undofile::FS_SORTING;
1073   }
1074 
1075   ptr.p->m_online.m_lsn = 0;
1076   ptr.p->m_online.m_outstanding = 0;
1077 
1078   Uint64 add= ptr.p->m_file_size - 1;
1079   lg_ptr.p->m_free_file_words += add * File_formats::UNDO_PAGE_WORDS;
1080 
1081   if(first)
1082   {
1083     jam();
1084 
1085     Buffer_idx tmp= { ptr.i, 0 };
1086     lg_ptr.p->m_file_pos[HEAD] = lg_ptr.p->m_file_pos[TAIL] = tmp;
1087 
1088     /**
1089      * Init log tail pointer
1090      */
1091     lg_ptr.p->m_tail_pos[0] = tmp;
1092     lg_ptr.p->m_tail_pos[1] = tmp;
1093     lg_ptr.p->m_tail_pos[2] = tmp;
1094     lg_ptr.p->m_next_reply_ptr_i = ptr.i;
1095   }
1096 
1097   validate_logfile_group(lg_ptr, "create_file_commit");
1098 
1099   CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1100   conf->senderData = senderData;
1101   conf->senderRef = reference();
1102   sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1103 	     CreateFileImplConf::SignalLength, JBB);
1104 }
1105 
1106 void
create_file_abort(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)1107 Lgman::create_file_abort(Signal* signal,
1108 			 Ptr<Logfile_group> lg_ptr,
1109 			 Ptr<Undofile> ptr)
1110 {
1111   if (ptr.p->m_fd == RNIL)
1112   {
1113     ((FsConf*)signal->getDataPtr())->userPointer = ptr.i;
1114     execFSCLOSECONF(signal);
1115     return;
1116   }
1117 
1118   FsCloseReq *req= (FsCloseReq*)signal->getDataPtrSend();
1119   req->filePointer = ptr.p->m_fd;
1120   req->userReference = reference();
1121   req->userPointer = ptr.i;
1122   req->fileFlag = 0;
1123   FsCloseReq::setRemoveFileFlag(req->fileFlag, true);
1124 
1125   sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal,
1126 	     FsCloseReq::SignalLength, JBB);
1127 }
1128 
1129 void
execFSCLOSECONF(Signal * signal)1130 Lgman::execFSCLOSECONF(Signal* signal)
1131 {
1132   Ptr<Undofile> ptr;
1133   Ptr<Logfile_group> lg_ptr;
1134   Uint32 ptrI = ((FsConf*)signal->getDataPtr())->userPointer;
1135   m_file_pool.getPtr(ptr, ptrI);
1136 
1137   Uint32 senderRef = ptr.p->m_create.m_senderRef;
1138   Uint32 senderData = ptr.p->m_create.m_senderData;
1139 
1140   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
1141 
1142   if (lg_ptr.p->m_state & Logfile_group::LG_DROPPING)
1143   {
1144     jam();
1145     {
1146       Local_undofile_list list(m_file_pool, lg_ptr.p->m_files);
1147       list.release(ptr);
1148     }
1149     drop_filegroup_drop_files(signal, lg_ptr, senderRef, senderData);
1150   }
1151   else
1152   {
1153     jam();
1154     Local_undofile_list list(m_file_pool, lg_ptr.p->m_meta_files);
1155     list.release(ptr);
1156 
1157     CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1158     conf->senderData = senderData;
1159     conf->senderRef = reference();
1160     sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1161 	       CreateFileImplConf::SignalLength, JBB);
1162   }
1163 }
1164 
1165 void
execDROP_FILE_IMPL_REQ(Signal * signal)1166 Lgman::execDROP_FILE_IMPL_REQ(Signal* signal)
1167 {
1168   jamEntry();
1169   ndbrequire(false);
1170 }
1171 
1172 #define CONSUMER 0
1173 #define PRODUCER 1
1174 
Logfile_group(const CreateFilegroupImplReq * req)1175 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req)
1176 {
1177   m_logfile_group_id = req->filegroup_id;
1178   m_version = req->filegroup_version;
1179   m_state = LG_ONLINE;
1180   m_outstanding_fs = 0;
1181   m_next_reply_ptr_i = RNIL;
1182 
1183   m_last_lsn = 0;
1184   m_last_synced_lsn = 0;
1185   m_last_sync_req_lsn = 0;
1186   m_max_sync_req_lsn = 0;
1187   m_last_read_lsn = 0;
1188   m_file_pos[0].m_ptr_i= m_file_pos[1].m_ptr_i = RNIL;
1189 
1190   m_free_file_words = 0;
1191   m_total_buffer_words = 0;
1192   m_free_buffer_words = 0;
1193   m_callback_buffer_words = 0;
1194 
1195   m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
1196   m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
1197   m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
1198   m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
1199 
1200   m_tail_pos[2].m_ptr_i= RNIL;
1201   m_tail_pos[2].m_idx= ~0;
1202 
1203   m_tail_pos[0] = m_tail_pos[1] = m_tail_pos[2];
1204 }
1205 
1206 bool
alloc_logbuffer_memory(Ptr<Logfile_group> ptr,Uint32 bytes)1207 Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes)
1208 {
1209   Uint32 pages= (((bytes + 3) >> 2) + File_formats::NDB_PAGE_SIZE_WORDS - 1)
1210     / File_formats::NDB_PAGE_SIZE_WORDS;
1211   Uint32 requested= pages;
1212   {
1213     Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1214     while(pages)
1215     {
1216       Uint32 ptrI;
1217       Uint32 cnt = pages > 64 ? 64 : pages;
1218       m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1);
1219       if (cnt)
1220       {
1221 	Buffer_idx range;
1222 	range.m_ptr_i= ptrI;
1223 	range.m_idx = cnt;
1224 
1225 	if (map.append((Uint32*)&range, 2) == false)
1226         {
1227           /**
1228            * Failed to append page-range...
1229            *   jump out of alloc routine
1230            */
1231           jam();
1232           m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS,
1233                                    range.m_ptr_i, range.m_idx);
1234           break;
1235         }
1236 	pages -= range.m_idx;
1237       }
1238       else
1239       {
1240 	break;
1241       }
1242     }
1243   }
1244 
1245   if(pages)
1246   {
1247     /* Could not allocate all of the requested memory.
1248      * So release that already allocated.
1249      */
1250     free_logbuffer_memory(ptr);
1251     return false;
1252   }
1253 
1254 #if defined VM_TRACE || defined ERROR_INSERT
1255   ndbout << "DD lgman: fg id:" << ptr.p->m_logfile_group_id << " undo buffer pages/bytes:" << (requested-pages) << "/" << (requested-pages)*File_formats::NDB_PAGE_SIZE << endl;
1256 #endif
1257 
1258   init_logbuffer_pointers(ptr);
1259   return true;
1260 }
1261 
1262 void
init_logbuffer_pointers(Ptr<Logfile_group> ptr)1263 Lgman::init_logbuffer_pointers(Ptr<Logfile_group> ptr)
1264 {
1265   Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1266   Page_map::Iterator it;
1267   union {
1268     Uint32 tmp[2];
1269     Buffer_idx range;
1270   };
1271 
1272   map.first(it);
1273   tmp[0] = *it.data;
1274   ndbrequire(map.next(it));
1275   tmp[1] = *it.data;
1276 
1277   ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0;      // Index in page map
1278   ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range
1279   ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
1280   ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0;               // Page pos
1281 
1282   ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0;      // Index in page map
1283   ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range
1284   ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
1285   ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;               // Page pos
1286 
1287   Uint32 pages= range.m_idx;
1288   while(map.next(it))
1289   {
1290     tmp[0] = *it.data;
1291     ndbrequire(map.next(it));
1292     tmp[1] = *it.data;
1293     pages += range.m_idx;
1294   }
1295 
1296   ptr.p->m_total_buffer_words =
1297     ptr.p->m_free_buffer_words = pages * File_formats::UNDO_PAGE_WORDS;
1298 }
1299 
1300 Uint32
compute_free_file_pages(Ptr<Logfile_group> ptr)1301 Lgman::compute_free_file_pages(Ptr<Logfile_group> ptr)
1302 {
1303   Buffer_idx head= ptr.p->m_file_pos[HEAD];
1304   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1305   Uint32 pages = 0;
1306   if (head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx)
1307   {
1308     pages += tail.m_idx - head.m_idx;
1309   }
1310   else
1311   {
1312     Ptr<Undofile> file;
1313     m_file_pool.getPtr(file, head.m_ptr_i);
1314     Local_undofile_list list(m_file_pool, ptr.p->m_files);
1315 
1316     do
1317     {
1318       pages += (file.p->m_file_size - head.m_idx - 1);
1319       if(!list.next(file))
1320 	list.first(file);
1321       head.m_idx = 0;
1322     } while(file.i != tail.m_ptr_i);
1323 
1324     pages += tail.m_idx - head.m_idx;
1325   }
1326   return pages;
1327 }
1328 
1329 void
free_logbuffer_memory(Ptr<Logfile_group> ptr)1330 Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr)
1331 {
1332   union {
1333     Uint32 tmp[2];
1334     Buffer_idx range;
1335   };
1336 
1337   Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1338 
1339   Page_map::Iterator it;
1340   map.first(it);
1341   while(!it.isNull())
1342   {
1343     tmp[0] = *it.data;
1344     ndbrequire(map.next(it));
1345     tmp[1] = *it.data;
1346 
1347     m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx);
1348     map.next(it);
1349   }
1350   map.release();
1351 }
1352 
Undofile(const struct CreateFileImplReq * req,Uint32 ptrI)1353 Lgman::Undofile::Undofile(const struct CreateFileImplReq* req, Uint32 ptrI)
1354 {
1355   m_fd = RNIL;
1356   m_file_id = req->file_id;
1357   m_logfile_group_ptr_i= ptrI;
1358 
1359   Uint64 pages = req->file_size_hi;
1360   pages = (pages << 32) | req->file_size_lo;
1361   pages /= GLOBAL_PAGE_SIZE;
1362   m_file_size = Uint32(pages);
1363 #if defined VM_TRACE || defined ERROR_INSERT
1364   ndbout << "DD lgman: file id:" << m_file_id << " undofile pages/bytes:" << m_file_size << "/" << m_file_size*GLOBAL_PAGE_SIZE << endl;
1365 #endif
1366 
1367   m_create.m_senderRef = req->senderRef; // During META
1368   m_create.m_senderData = req->senderData; // During META
1369   m_create.m_logfile_group_id = req->filegroup_id;
1370 }
1371 
Logfile_client(SimulatedBlock * block,Lgman * lgman,Uint32 logfile_group_id,bool lock)1372 Logfile_client::Logfile_client(SimulatedBlock* block,
1373 			       Lgman* lgman, Uint32 logfile_group_id,
1374                                bool lock)
1375 {
1376   Uint32 bno = block->number();
1377   Uint32 ino = block->instance();
1378   m_client_block= block;
1379   m_block= numberToBlock(bno, ino);
1380   m_lgman= lgman;
1381   m_lock = lock;
1382   m_logfile_group_id= logfile_group_id;
1383   D("client ctor " << bno << "/" << ino);
1384   if (m_lock)
1385     m_lgman->client_lock(m_block, 0);
1386 }
1387 
~Logfile_client()1388 Logfile_client::~Logfile_client()
1389 {
1390 #ifdef VM_TRACE
1391   Uint32 bno = blockToMain(m_block);
1392   Uint32 ino = blockToInstance(m_block);
1393 #endif
1394   D("client dtor " << bno << "/" << ino);
1395   if (m_lock)
1396     m_lgman->client_unlock(m_block, 0);
1397 }
1398 
1399 int
sync_lsn(Signal * signal,Uint64 lsn,Request * req,Uint32 flags)1400 Logfile_client::sync_lsn(Signal* signal,
1401 			 Uint64 lsn, Request* req, Uint32 flags)
1402 {
1403   Ptr<Lgman::Logfile_group> ptr;
1404   if(m_lgman->m_logfile_group_list.first(ptr))
1405   {
1406     if(ptr.p->m_last_synced_lsn >= lsn)
1407     {
1408       return 1;
1409     }
1410 
1411     bool empty= false;
1412     Ptr<Lgman::Log_waiter> wait;
1413     {
1414       Lgman::Local_log_waiter_list
1415 	list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1416 
1417       empty= list.isEmpty();
1418       if(!list.seize(wait))
1419 	return -1;
1420 
1421       wait.p->m_block= m_block;
1422       wait.p->m_sync_lsn= lsn;
1423       memcpy(&wait.p->m_callback, &req->m_callback,
1424 	     sizeof(SimulatedBlock::CallbackPtr));
1425 
1426       ptr.p->m_max_sync_req_lsn = lsn > ptr.p->m_max_sync_req_lsn ?
1427 	lsn : ptr.p->m_max_sync_req_lsn;
1428     }
1429 
1430     if(ptr.p->m_last_sync_req_lsn < lsn &&
1431        ! (ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD))
1432     {
1433       ptr.p->m_state |= Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1434       signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1435       signal->theData[1] = ptr.i;
1436       signal->theData[2] = (Uint32)(lsn >> 32);
1437       signal->theData[3] = (Uint32)(lsn & 0xFFFFFFFF);
1438       m_client_block->sendSignalWithDelay(m_lgman->reference(),
1439                                           GSN_CONTINUEB, signal, 10, 4);
1440     }
1441     return 0;
1442   }
1443   return -1;
1444 }
1445 
1446 void
force_log_sync(Signal * signal,Ptr<Logfile_group> ptr,Uint32 lsn_hi,Uint32 lsn_lo)1447 Lgman::force_log_sync(Signal* signal,
1448 		      Ptr<Logfile_group> ptr,
1449 		      Uint32 lsn_hi, Uint32 lsn_lo)
1450 {
1451   Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1452   Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo;
1453 
1454   if(ptr.p->m_last_sync_req_lsn < force_lsn)
1455   {
1456     /**
1457      * Do force
1458      */
1459     Buffer_idx pos= ptr.p->m_pos[PRODUCER].m_current_pos;
1460     GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1461 
1462     Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1463     if(pos.m_idx) // don't flush empty page...
1464     {
1465       Uint64 lsn= ptr.p->m_last_lsn - 1;
1466 
1467       File_formats::Undofile::Undo_page* undo=
1468 	(File_formats::Undofile::Undo_page*)page;
1469       undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1470       undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1471       undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1472 
1473       /**
1474        * Update free space with extra NOOP
1475        */
1476       ndbrequire(ptr.p->m_free_file_words >= free);
1477       ndbrequire(ptr.p->m_free_buffer_words > free);
1478       ptr.p->m_free_file_words -= free;
1479       ptr.p->m_free_buffer_words -= free;
1480 
1481       validate_logfile_group(ptr, "force_log_sync");
1482 
1483       next_page(ptr.p, PRODUCER);
1484       ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1485     }
1486   }
1487 
1488 
1489 
1490   Uint64 max_req_lsn = ptr.p->m_max_sync_req_lsn;
1491   if(max_req_lsn > force_lsn &&
1492      max_req_lsn > ptr.p->m_last_sync_req_lsn)
1493   {
1494     ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
1495     signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1496     signal->theData[1] = ptr.i;
1497     signal->theData[2] = (Uint32)(max_req_lsn >> 32);
1498     signal->theData[3] = (Uint32)(max_req_lsn & 0xFFFFFFFF);
1499     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 4);
1500   }
1501   else
1502   {
1503     ptr.p->m_state &= ~(Uint32)Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1504   }
1505 }
1506 
1507 void
process_log_sync_waiters(Signal * signal,Ptr<Logfile_group> ptr)1508 Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1509 {
1510   Local_log_waiter_list
1511     list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1512 
1513   if(list.isEmpty())
1514   {
1515     return;
1516   }
1517 
1518   bool removed= false;
1519   Ptr<Log_waiter> waiter;
1520   list.first(waiter);
1521   Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1522 
1523   if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
1524   {
1525     removed= true;
1526     Uint32 block = waiter.p->m_block;
1527     CallbackPtr & callback = waiter.p->m_callback;
1528     sendCallbackConf(signal, block, callback, logfile_group_id);
1529 
1530     list.releaseFirst(waiter);
1531   }
1532 
1533   if(removed && !list.isEmpty())
1534   {
1535     ptr.p->m_state |= Logfile_group::LG_SYNC_WAITERS_THREAD;
1536     signal->theData[0] = LgmanContinueB::PROCESS_LOG_SYNC_WAITERS;
1537     signal->theData[1] = ptr.i;
1538     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1539   }
1540   else
1541   {
1542     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_SYNC_WAITERS_THREAD;
1543   }
1544 }
1545 
1546 
1547 Uint32*
get_log_buffer(Ptr<Logfile_group> ptr,Uint32 sz)1548 Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
1549 {
1550   GlobalPage *page;
1551   page=m_shared_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i);
1552 
1553   Uint32 total_free= ptr.p->m_free_buffer_words;
1554   assert(total_free >= sz);
1555   Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
1556   Uint32 free= File_formats::UNDO_PAGE_WORDS - pos;
1557 
1558   if(sz <= free)
1559   {
1560 next:
1561     // fits this page wo/ problem
1562     ndbrequire(total_free >= sz);
1563     ptr.p->m_free_buffer_words = total_free - sz;
1564     ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
1565     return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
1566   }
1567 
1568   /**
1569    * It didn't fit page...fill page with a NOOP log entry
1570    */
1571   Uint64 lsn= ptr.p->m_last_lsn - 1;
1572   File_formats::Undofile::Undo_page* undo=
1573     (File_formats::Undofile::Undo_page*)page;
1574   undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1575   undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1576   undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1577 
1578   /**
1579    * Update free space with extra NOOP
1580    */
1581   ndbrequire(ptr.p->m_free_file_words >= free);
1582   ptr.p->m_free_file_words -= free;
1583 
1584   validate_logfile_group(ptr, "get_log_buffer");
1585 
1586   pos= 0;
1587   assert(total_free >= free);
1588   total_free -= free;
1589   page= m_shared_page_pool.getPtr(next_page(ptr.p, PRODUCER));
1590   goto next;
1591 }
1592 
1593 Uint32
next_page(Logfile_group * ptrP,Uint32 i)1594 Lgman::next_page(Logfile_group* ptrP, Uint32 i)
1595 {
1596   Uint32 page_ptr_i= ptrP->m_pos[i].m_current_pos.m_ptr_i;
1597   Uint32 left_in_range= ptrP->m_pos[i].m_current_page.m_idx;
1598   if(left_in_range > 0)
1599   {
1600     ptrP->m_pos[i].m_current_page.m_idx = left_in_range - 1;
1601     ptrP->m_pos[i].m_current_pos.m_ptr_i = page_ptr_i + 1;
1602     return page_ptr_i + 1;
1603   }
1604   else
1605   {
1606     Lgman::Page_map map(m_data_buffer_pool, ptrP->m_buffer_pages);
1607     Uint32 pos= (ptrP->m_pos[i].m_current_page.m_ptr_i + 2) % map.getSize();
1608     Lgman::Page_map::Iterator it;
1609     map.position(it, pos);
1610 
1611     union {
1612       Uint32 tmp[2];
1613       Lgman::Buffer_idx range;
1614     };
1615 
1616     tmp[0] = *it.data; map.next(it);
1617     tmp[1] = *it.data;
1618 
1619     ptrP->m_pos[i].m_current_page.m_ptr_i = pos;           // New index in map
1620     ptrP->m_pos[i].m_current_page.m_idx = range.m_idx - 1; // Free pages
1621     ptrP->m_pos[i].m_current_pos.m_ptr_i = range.m_ptr_i;  // Current page
1622     // No need to set ptrP->m_current_pos.m_idx, that is set "in higher"-func
1623     return range.m_ptr_i;
1624   }
1625 }
1626 
1627 int
get_log_buffer(Signal * signal,Uint32 sz,SimulatedBlock::CallbackPtr * callback)1628 Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
1629 			       SimulatedBlock::CallbackPtr* callback)
1630 {
1631   sz += 2; // lsn
1632   Lgman::Logfile_group key;
1633   key.m_logfile_group_id= m_logfile_group_id;
1634   Ptr<Lgman::Logfile_group> ptr;
1635   if(m_lgman->m_logfile_group_hash.find(ptr, key))
1636   {
1637     Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
1638     Uint32 free_buffer = ptr.p->m_free_buffer_words;
1639     if (free_buffer >= (sz + callback_buffer + FREE_BUFFER_MARGIN) &&
1640         ptr.p->m_log_buffer_waiters.isEmpty())
1641     {
1642       ptr.p->m_callback_buffer_words = callback_buffer + sz;
1643       return 1;
1644     }
1645 
1646     bool empty= false;
1647     {
1648       Ptr<Lgman::Log_waiter> wait;
1649       Lgman::Local_log_waiter_list
1650 	list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1651 
1652       empty= list.isEmpty();
1653       if(!list.seize(wait))
1654       {
1655 	return -1;
1656       }
1657 
1658       wait.p->m_size= sz;
1659       wait.p->m_block= m_block;
1660       memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::CallbackPtr));
1661     }
1662 
1663     return 0;
1664   }
1665   return -1;
1666 }
1667 
1668 NdbOut&
operator <<(NdbOut & out,const Lgman::Buffer_idx & pos)1669 operator<<(NdbOut& out, const Lgman::Buffer_idx& pos)
1670 {
1671   out << "[ "
1672       << pos.m_ptr_i << " "
1673       << pos.m_idx << " ]";
1674   return out;
1675 }
1676 
1677 NdbOut&
operator <<(NdbOut & out,const Lgman::Logfile_group::Position & pos)1678 operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
1679 {
1680   out << "[ ("
1681       << pos.m_current_page.m_ptr_i << " "
1682       << pos.m_current_page.m_idx << ") ("
1683       << pos.m_current_pos.m_ptr_i << " "
1684       << pos.m_current_pos.m_idx << ") ]";
1685   return out;
1686 }
1687 
1688 void
flush_log(Signal * signal,Ptr<Logfile_group> ptr,Uint32 force)1689 Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
1690 {
1691   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
1692   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
1693 
1694   jamEntry();
1695 
1696   if (consumer.m_current_page == producer.m_current_page)
1697   {
1698     jam();
1699     Buffer_idx pos = producer.m_current_pos;
1700 
1701 #if 0
1702     if (force)
1703     {
1704       ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
1705 	       force, ptr.p->m_file_pos[HEAD].m_ptr_i);
1706       ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
1707 	       consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
1708 	       producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
1709     }
1710 #endif
1711     if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1712     {
1713       jam();
1714 
1715       if (ptr.p->m_log_buffer_waiters.isEmpty() || pos.m_idx == 0)
1716       {
1717         jam();
1718 	force =  0;
1719       }
1720       else if (ptr.p->m_free_buffer_words < FREE_BUFFER_MARGIN)
1721       {
1722         jam();
1723         force = 2;
1724       }
1725 
1726       if (force < 2 || ptr.p->m_outstanding_fs)
1727       {
1728         jam();
1729 	signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1730 	signal->theData[1] = ptr.i;
1731 	signal->theData[2] = force + 1;
1732 	sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
1733 			    force ? 10 : 100, 3);
1734 	return;
1735       }
1736       else
1737       {
1738         jam();
1739 	GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1740 
1741 	Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1742 
1743 	ndbout_c("force flush %d %d outstanding: %u isEmpty(): %u",
1744                  pos.m_idx, ptr.p->m_free_buffer_words,
1745                  ptr.p->m_outstanding_fs,
1746                  ptr.p->m_log_buffer_waiters.isEmpty());
1747 
1748 	ndbrequire(pos.m_idx); // don't flush empty page...
1749 	Uint64 lsn= ptr.p->m_last_lsn - 1;
1750 
1751 	File_formats::Undofile::Undo_page* undo=
1752 	  (File_formats::Undofile::Undo_page*)page;
1753 	undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1754 	undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1755 	undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1756 
1757 	/**
1758 	 * Update free space with extra NOOP
1759 	 */
1760 	ndbrequire(ptr.p->m_free_file_words >= free);
1761 	ndbrequire(ptr.p->m_free_buffer_words > free);
1762 	ptr.p->m_free_file_words -= free;
1763 	ptr.p->m_free_buffer_words -= free;
1764 
1765 	validate_logfile_group(ptr, "force_log_flush");
1766 
1767 	next_page(ptr.p, PRODUCER);
1768 	ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1769 	producer = ptr.p->m_pos[PRODUCER];
1770 	// break through
1771       }
1772     }
1773     else
1774     {
1775       jam();
1776       ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1777       return;
1778     }
1779   }
1780 
1781   bool full= false;
1782   Uint32 tot= 0;
1783   while(!(consumer.m_current_page == producer.m_current_page) && !full)
1784   {
1785     jam();
1786     validate_logfile_group(ptr, "before flush log");
1787 
1788     Uint32 cnt; // pages written
1789     Uint32 page= consumer.m_current_pos.m_ptr_i;
1790     if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i)
1791     {
1792       /**
1793        * In same range
1794        */
1795       jam();
1796 
1797       if(producer.m_current_pos.m_ptr_i > page)
1798       {
1799         /**
1800          * producer ahead of consumer in same chunk
1801          */
1802 	jam();
1803 	Uint32 tmp= producer.m_current_pos.m_ptr_i - page;
1804 	cnt= write_log_pages(signal, ptr, page, tmp);
1805 	assert(cnt <= tmp);
1806 
1807 	consumer.m_current_pos.m_ptr_i += cnt;
1808 	consumer.m_current_page.m_idx -= cnt;
1809 	full= (tmp > cnt);
1810       }
1811       else
1812       {
1813         /**
1814          * consumer ahead of producer in same chunk
1815          */
1816 	Uint32 tmp= consumer.m_current_page.m_idx + 1;
1817 	cnt= write_log_pages(signal, ptr, page, tmp);
1818 	assert(cnt <= tmp);
1819 
1820 	if(cnt == tmp)
1821 	{
1822 	  jam();
1823 	  /**
1824 	   * Entire chunk is written
1825 	   *   move to next
1826 	   */
1827 	  ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1828 	  next_page(ptr.p, CONSUMER);
1829 	  consumer = ptr.p->m_pos[CONSUMER];
1830  	}
1831 	else
1832 	{
1833 	  jam();
1834 	  /**
1835 	   * Failed to write entire chunk...
1836 	   */
1837 	  full= true;
1838 	  consumer.m_current_page.m_idx -= cnt;
1839 	  consumer.m_current_pos.m_ptr_i += cnt;
1840 	}
1841       }
1842     }
1843     else
1844     {
1845       Uint32 tmp= consumer.m_current_page.m_idx + 1;
1846       cnt= write_log_pages(signal, ptr, page, tmp);
1847       assert(cnt <= tmp);
1848 
1849       if(cnt == tmp)
1850       {
1851 	jam();
1852 	/**
1853 	 * Entire chunk is written
1854 	 *   move to next
1855 	 */
1856 	ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1857 	next_page(ptr.p, CONSUMER);
1858 	consumer = ptr.p->m_pos[CONSUMER];
1859       }
1860       else
1861       {
1862 	jam();
1863 	/**
1864 	 * Failed to write entire chunk...
1865 	 */
1866 	full= true;
1867 	consumer.m_current_page.m_idx -= cnt;
1868 	consumer.m_current_pos.m_ptr_i += cnt;
1869       }
1870     }
1871 
1872     tot += cnt;
1873     if(cnt)
1874       validate_logfile_group(ptr, " after flush_log");
1875   }
1876 
1877   ptr.p->m_pos[CONSUMER]= consumer;
1878 
1879   if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1880   {
1881     signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1882     signal->theData[1] = ptr.i;
1883     signal->theData[2] = 0;
1884     sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1885   }
1886   else
1887   {
1888     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1889   }
1890 }
1891 
1892 void
process_log_buffer_waiters(Signal * signal,Ptr<Logfile_group> ptr)1893 Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1894 {
1895   Uint32 free_buffer= ptr.p->m_free_buffer_words;
1896   Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
1897   Local_log_waiter_list
1898     list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1899 
1900   if (list.isEmpty())
1901   {
1902     jam();
1903     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1904     return;
1905   }
1906 
1907   bool removed= false;
1908   Ptr<Log_waiter> waiter;
1909   list.first(waiter);
1910   Uint32 sz  = waiter.p->m_size;
1911   Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1912   if (sz + callback_buffer + FREE_BUFFER_MARGIN < free_buffer)
1913   {
1914     jam();
1915     removed= true;
1916     Uint32 block = waiter.p->m_block;
1917     CallbackPtr & callback = waiter.p->m_callback;
1918     ptr.p->m_callback_buffer_words += sz;
1919     sendCallbackConf(signal, block, callback, logfile_group_id);
1920 
1921     list.releaseFirst(waiter);
1922   }
1923 
1924   if (removed && !list.isEmpty())
1925   {
1926     jam();
1927     ptr.p->m_state |= Logfile_group::LG_WAITERS_THREAD;
1928     signal->theData[0] = LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS;
1929     signal->theData[1] = ptr.i;
1930     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1931   }
1932   else
1933   {
1934     jam();
1935     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1936   }
1937 }
1938 
1939 #define REALLY_SLOW_FS 0
1940 
1941 Uint32
write_log_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 in_pages)1942 Lgman::write_log_pages(Signal* signal, Ptr<Logfile_group> ptr,
1943 		       Uint32 pageId, Uint32 in_pages)
1944 {
1945   assert(in_pages);
1946   Ptr<Undofile> filePtr;
1947   Buffer_idx head= ptr.p->m_file_pos[HEAD];
1948   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1949   m_file_pool.getPtr(filePtr, head.m_ptr_i);
1950 
1951   if(filePtr.p->m_online.m_outstanding > 0)
1952   {
1953     jam();
1954     return 0;
1955   }
1956 
1957   Uint32 sz= filePtr.p->m_file_size - 1; // skip zero
1958   Uint32 max, pages= in_pages;
1959 
1960   if(!(head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx))
1961   {
1962     max= sz - head.m_idx;
1963   }
1964   else
1965   {
1966     max= tail.m_idx - head.m_idx;
1967   }
1968 
1969   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
1970   req->filePointer = filePtr.p->m_fd;
1971   req->userReference = reference();
1972   req->userPointer = filePtr.i;
1973   req->varIndex = 1+head.m_idx; // skip zero page
1974   req->numberOfPages = pages;
1975   req->data.pageData[0] = pageId;
1976   req->operationFlag = 0;
1977   FsReadWriteReq::setFormatFlag(req->operationFlag,
1978 				FsReadWriteReq::fsFormatSharedPage);
1979 
1980   if(max > pages)
1981   {
1982     jam();
1983     max= pages;
1984     head.m_idx += max;
1985     ptr.p->m_file_pos[HEAD] = head;
1986 
1987     if (REALLY_SLOW_FS)
1988       sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1989 			  FsReadWriteReq::FixedLength + 1);
1990     else
1991       sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1992 		 FsReadWriteReq::FixedLength + 1, JBA);
1993 
1994     ptr.p->m_outstanding_fs++;
1995     filePtr.p->m_online.m_outstanding = max;
1996     filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1997 
1998     File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
1999       m_shared_page_pool.getPtr(pageId + max - 1);
2000     Uint64 lsn = 0;
2001     lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2002     lsn += page->m_page_header.m_page_lsn_lo;
2003 
2004     filePtr.p->m_online.m_lsn = lsn;  // Store last writereq lsn on file
2005     ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
2006   }
2007   else
2008   {
2009     jam();
2010     req->numberOfPages = max;
2011     FsReadWriteReq::setSyncFlag(req->operationFlag, 1);
2012 
2013     if (REALLY_SLOW_FS)
2014       sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
2015 			  FsReadWriteReq::FixedLength + 1);
2016     else
2017       sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
2018 		 FsReadWriteReq::FixedLength + 1, JBA);
2019 
2020     ptr.p->m_outstanding_fs++;
2021     filePtr.p->m_online.m_outstanding = max;
2022     filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
2023 
2024     File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
2025       m_shared_page_pool.getPtr(pageId + max - 1);
2026     Uint64 lsn = 0;
2027     lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2028     lsn += page->m_page_header.m_page_lsn_lo;
2029 
2030     filePtr.p->m_online.m_lsn = lsn;  // Store last writereq lsn on file
2031     ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
2032 
2033     Ptr<Undofile> next = filePtr;
2034     Local_undofile_list files(m_file_pool, ptr.p->m_files);
2035     if(!files.next(next))
2036     {
2037       jam();
2038       files.first(next);
2039     }
2040     ndbout_c("changing file from %d to %d", filePtr.i, next.i);
2041     filePtr.p->m_state |= Undofile::FS_MOVE_NEXT;
2042     next.p->m_state &= ~(Uint32)Undofile::FS_EMPTY;
2043 
2044     head.m_idx= 0;
2045     head.m_ptr_i= next.i;
2046     ptr.p->m_file_pos[HEAD] = head;
2047     if(max < pages)
2048       max += write_log_pages(signal, ptr, pageId + max, pages - max);
2049   }
2050 
2051   assert(max);
2052   return max;
2053 }
2054 
2055 void
execFSWRITEREF(Signal * signal)2056 Lgman::execFSWRITEREF(Signal* signal)
2057 {
2058   jamEntry();
2059   SimulatedBlock::execFSWRITEREF(signal);
2060   ndbrequire(false);
2061 }
2062 
2063 void
execFSWRITECONF(Signal * signal)2064 Lgman::execFSWRITECONF(Signal* signal)
2065 {
2066   jamEntry();
2067   client_lock(number(), __LINE__);
2068   FsConf * conf = (FsConf*)signal->getDataPtr();
2069   Ptr<Undofile> ptr;
2070   m_file_pool.getPtr(ptr, conf->userPointer);
2071 
2072   ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2073   ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2074 
2075   Ptr<Logfile_group> lg_ptr;
2076   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2077 
2078   Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2079   ndbrequire(cnt);
2080 
2081   if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2082   {
2083     Uint32 tot= 0;
2084     Uint64 lsn = 0;
2085     {
2086       Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2087       while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2088       {
2089 	Uint32 state= ptr.p->m_state;
2090 	Uint32 pages= ptr.p->m_online.m_outstanding;
2091 	ndbrequire(pages);
2092 	ptr.p->m_online.m_outstanding= 0;
2093 	ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2094 	tot += pages;
2095 	cnt--;
2096 
2097 	lsn = ptr.p->m_online.m_lsn;
2098 
2099 	if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr))
2100 	  files.first(ptr);
2101       }
2102     }
2103 
2104     ndbassert(tot);
2105     lg_ptr.p->m_outstanding_fs = cnt;
2106     lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS);
2107     lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2108     lg_ptr.p->m_last_synced_lsn = lsn;
2109 
2110     if(! (lg_ptr.p->m_state & Logfile_group::LG_SYNC_WAITERS_THREAD))
2111     {
2112       process_log_sync_waiters(signal, lg_ptr);
2113     }
2114 
2115     if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
2116     {
2117       process_log_buffer_waiters(signal, lg_ptr);
2118     }
2119   }
2120   else
2121   {
2122     ndbout_c("miss matched writes");
2123   }
2124   client_unlock(number(), __LINE__);
2125 
2126   return;
2127 }
2128 
2129 void
execLCP_FRAG_ORD(Signal * signal)2130 Lgman::execLCP_FRAG_ORD(Signal* signal)
2131 {
2132   jamEntry();
2133   client_lock(number(), __LINE__);
2134   exec_lcp_frag_ord(signal, this);
2135   client_unlock(number(), __LINE__);
2136 }
2137 
2138 void
exec_lcp_frag_ord(Signal * signal,SimulatedBlock * client_block)2139 Lgman::exec_lcp_frag_ord(Signal* signal, SimulatedBlock* client_block)
2140 {
2141   jamEntry();
2142 
2143   LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
2144   Uint32 lcp_id= ord->lcpId;
2145   Uint32 frag_id = ord->fragmentId;
2146   Uint32 table_id = ord->tableId;
2147 
2148   Ptr<Logfile_group> ptr;
2149   m_logfile_group_list.first(ptr);
2150 
2151   Uint32 entry= lcp_id == m_latest_lcp ?
2152     File_formats::Undofile::UNDO_LCP : File_formats::Undofile::UNDO_LCP_FIRST;
2153   if(!ptr.isNull() && ! (ptr.p->m_state & Logfile_group::LG_CUT_LOG_THREAD))
2154   {
2155     jam();
2156     ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
2157     signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
2158     signal->theData[1] = ptr.i;
2159     client_block->sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2160   }
2161 
2162   if(!ptr.isNull() && ptr.p->m_last_lsn)
2163   {
2164     Uint32 undo[3];
2165     undo[0] = lcp_id;
2166     undo[1] = (table_id << 16) | frag_id;
2167     undo[2] = (entry << 16 ) | (sizeof(undo) >> 2);
2168 
2169     Uint64 last_lsn= m_last_lsn;
2170 
2171     if(ptr.p->m_last_lsn == last_lsn
2172 #ifdef VM_TRACE
2173        && ((rand() % 100) > 50)
2174 #endif
2175        )
2176     {
2177       undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2178       Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
2179       memcpy(dst, undo, sizeof(undo));
2180       ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
2181       ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
2182     }
2183     else
2184     {
2185       Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
2186       * dst++ = (Uint32)(last_lsn >> 32);
2187       * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
2188       memcpy(dst, undo, sizeof(undo));
2189       ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
2190       ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
2191     }
2192     ptr.p->m_last_lcp_lsn = last_lsn;
2193     m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2194 
2195     validate_logfile_group(ptr, "execLCP_FRAG_ORD");
2196   }
2197 
2198   while(!ptr.isNull())
2199   {
2200     if (ptr.p->m_last_lsn)
2201     {
2202       /**
2203        * First LCP_FRAGORD for each LCP, sets tail pos
2204        */
2205       if(m_latest_lcp != lcp_id)
2206       {
2207 	ptr.p->m_tail_pos[0] = ptr.p->m_tail_pos[1];
2208 	ptr.p->m_tail_pos[1] = ptr.p->m_tail_pos[2];
2209 	ptr.p->m_tail_pos[2] = ptr.p->m_file_pos[HEAD];
2210       }
2211 
2212       if(0)
2213 	ndbout_c
2214 	  ("execLCP_FRAG_ORD (%d %d) (%d %d) (%d %d) free pages: %ld",
2215 	   ptr.p->m_tail_pos[0].m_ptr_i, ptr.p->m_tail_pos[0].m_idx,
2216 	   ptr.p->m_tail_pos[1].m_ptr_i, ptr.p->m_tail_pos[1].m_idx,
2217 	   ptr.p->m_tail_pos[2].m_ptr_i, ptr.p->m_tail_pos[2].m_idx,
2218 	   (long) (ptr.p->m_free_file_words / File_formats::UNDO_PAGE_WORDS));
2219     }
2220     m_logfile_group_list.next(ptr);
2221   }
2222 
2223   m_latest_lcp = lcp_id;
2224 }
2225 
2226 void
execEND_LCP_REQ(Signal * signal)2227 Lgman::execEND_LCP_REQ(Signal* signal)
2228 {
2229   EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
2230   ndbrequire(m_latest_lcp == req->backupId);
2231   m_end_lcp_senderdata = req->senderData;
2232 
2233   Ptr<Logfile_group> ptr;
2234   m_logfile_group_list.first(ptr);
2235   bool wait= false;
2236   while(!ptr.isNull())
2237   {
2238     Uint64 lcp_lsn = ptr.p->m_last_lcp_lsn;
2239     if(ptr.p->m_last_synced_lsn < lcp_lsn)
2240     {
2241       wait= true;
2242       if(signal->getSendersBlockRef() != reference())
2243       {
2244         D("Logfile_client - execEND_LCP_REQ");
2245 	Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
2246 	Logfile_client::Request req;
2247 	req.m_callback.m_callbackData = ptr.i;
2248 	req.m_callback.m_callbackIndex = ENDLCP_CALLBACK;
2249 	ndbrequire(tmp.sync_lsn(signal, lcp_lsn, &req, 0) == 0);
2250       }
2251     }
2252     else
2253     {
2254       ptr.p->m_last_lcp_lsn = 0;
2255     }
2256     m_logfile_group_list.next(ptr);
2257   }
2258 
2259   if(wait)
2260   {
2261     return;
2262   }
2263 
2264   EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
2265   conf->senderData = m_end_lcp_senderdata;
2266   conf->senderRef = reference();
2267   sendSignal(DBLQH_REF, GSN_END_LCP_CONF,
2268              signal, EndLcpConf::SignalLength, JBB);
2269 }
2270 
2271 void
endlcp_callback(Signal * signal,Uint32 ptr,Uint32 res)2272 Lgman::endlcp_callback(Signal* signal, Uint32 ptr, Uint32 res)
2273 {
2274   EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
2275   req->backupId = m_latest_lcp;
2276   req->senderData = m_end_lcp_senderdata;
2277   execEND_LCP_REQ(signal);
2278 }
2279 
2280 void
cut_log_tail(Signal * signal,Ptr<Logfile_group> ptr)2281 Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
2282 {
2283   bool done= true;
2284   if (likely(ptr.p->m_last_lsn))
2285   {
2286     Buffer_idx tmp= ptr.p->m_tail_pos[0];
2287     Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2288 
2289     Ptr<Undofile> filePtr;
2290     m_file_pool.getPtr(filePtr, tail.m_ptr_i);
2291 
2292     if(!(tmp == tail))
2293     {
2294       Uint32 free;
2295       if(tmp.m_ptr_i == tail.m_ptr_i && tail.m_idx < tmp.m_idx)
2296       {
2297 	free= tmp.m_idx - tail.m_idx;
2298 	ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
2299 	ptr.p->m_file_pos[TAIL] = tmp;
2300       }
2301       else
2302       {
2303 	free= filePtr.p->m_file_size - tail.m_idx - 1;
2304 	ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
2305 
2306 	Ptr<Undofile> next = filePtr;
2307 	Local_undofile_list files(m_file_pool, ptr.p->m_files);
2308 	while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
2309 	  ndbrequire(next.i != filePtr.i);
2310 	if(next.isNull())
2311 	{
2312 	  jam();
2313 	  files.first(next);
2314 	  while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
2315 	    ndbrequire(next.i != filePtr.i);
2316 	}
2317 
2318 	tmp.m_idx= 0;
2319 	tmp.m_ptr_i= next.i;
2320 	ptr.p->m_file_pos[TAIL] = tmp;
2321 	done= false;
2322       }
2323     }
2324 
2325     validate_logfile_group(ptr, "cut log");
2326   }
2327 
2328   if (done)
2329   {
2330     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_CUT_LOG_THREAD;
2331     m_logfile_group_list.next(ptr);
2332   }
2333 
2334   if(!done || !ptr.isNull())
2335   {
2336     ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
2337     signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
2338     signal->theData[1] = ptr.i;
2339     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2340   }
2341 }
2342 
2343 void
execSUB_GCP_COMPLETE_REP(Signal * signal)2344 Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal)
2345 {
2346   jamEntry();
2347 
2348   Ptr<Logfile_group> ptr;
2349   m_logfile_group_list.first(ptr);
2350 
2351   /**
2352    * Filter all logfile groups in parallell
2353    */
2354   return; // NOT IMPLETMENT YET
2355 
2356   signal->theData[0] = LgmanContinueB::FILTER_LOG;
2357   while(!ptr.isNull())
2358   {
2359     signal->theData[1] = ptr.i;
2360     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2361     m_logfile_group_list.next(ptr);
2362   }
2363 }
2364 
2365 int
alloc_log_space(Uint32 ref,Uint32 words)2366 Lgman::alloc_log_space(Uint32 ref, Uint32 words)
2367 {
2368   ndbrequire(words);
2369   words += 2; // lsn
2370   Logfile_group key;
2371   key.m_logfile_group_id= ref;
2372   Ptr<Logfile_group> ptr;
2373   if(m_logfile_group_hash.find(ptr, key) &&
2374      ptr.p->m_free_file_words >= (words + (4 * File_formats::UNDO_PAGE_WORDS)))
2375   {
2376     ptr.p->m_free_file_words -= words;
2377     validate_logfile_group(ptr, "alloc_log_space");
2378     return 0;
2379   }
2380 
2381   if(ptr.isNull())
2382   {
2383     return -1;
2384   }
2385 
2386   return 1501;
2387 }
2388 
2389 int
free_log_space(Uint32 ref,Uint32 words)2390 Lgman::free_log_space(Uint32 ref, Uint32 words)
2391 {
2392   ndbrequire(words);
2393   Logfile_group key;
2394   key.m_logfile_group_id= ref;
2395   Ptr<Logfile_group> ptr;
2396   if(m_logfile_group_hash.find(ptr, key))
2397   {
2398     ptr.p->m_free_file_words += (words + 2);
2399     validate_logfile_group(ptr, "free_log_space");
2400     return 0;
2401   }
2402   ndbrequire(false);
2403   return -1;
2404 }
2405 
2406 Uint64
add_entry(const Change * src,Uint32 cnt)2407 Logfile_client::add_entry(const Change* src, Uint32 cnt)
2408 {
2409   Uint32 i, tot= 0;
2410   for(i= 0; i<cnt; i++)
2411   {
2412     tot += src[i].len;
2413   }
2414 
2415   Uint32 *dst;
2416   Uint64 last_lsn= m_lgman->m_last_lsn;
2417   {
2418     Lgman::Logfile_group key;
2419     key.m_logfile_group_id= m_logfile_group_id;
2420     Ptr<Lgman::Logfile_group> ptr;
2421     if(m_lgman->m_logfile_group_hash.find(ptr, key))
2422     {
2423       Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
2424       Uint64 last_lsn_filegroup= ptr.p->m_last_lsn;
2425       if(last_lsn_filegroup == last_lsn
2426 #ifdef VM_TRACE
2427 	 && ((rand() % 100) > 50)
2428 #endif
2429 	 )
2430       {
2431 	dst= m_lgman->get_log_buffer(ptr, tot);
2432 	for(i= 0; i<cnt; i++)
2433 	{
2434 	  memcpy(dst, src[i].ptr, 4*src[i].len);
2435 	  dst += src[i].len;
2436 	}
2437 	* (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2438 	ptr.p->m_free_file_words += 2;
2439 	m_lgman->validate_logfile_group(ptr);
2440       }
2441       else
2442       {
2443 	dst= m_lgman->get_log_buffer(ptr, tot + 2);
2444 	* dst++ = (Uint32)(last_lsn >> 32);
2445 	* dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
2446 	for(i= 0; i<cnt; i++)
2447 	{
2448 	  memcpy(dst, src[i].ptr, 4*src[i].len);
2449 	  dst += src[i].len;
2450 	}
2451       }
2452       /**
2453        * for callback_buffer, always allocats 2 extra...
2454        *   not knowing if LSN must be added or not
2455        */
2456       tot += 2;
2457 
2458       if (unlikely(! (tot <= callback_buffer)))
2459       {
2460         abort();
2461       }
2462       ptr.p->m_callback_buffer_words = callback_buffer - tot;
2463     }
2464 
2465     m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2466 
2467     return last_lsn;
2468   }
2469 }
2470 
2471 void
execSTART_RECREQ(Signal * signal)2472 Lgman::execSTART_RECREQ(Signal* signal)
2473 {
2474   m_latest_lcp = signal->theData[0];
2475 
2476   Ptr<Logfile_group> ptr;
2477   m_logfile_group_list.first(ptr);
2478 
2479   if(ptr.i != RNIL)
2480   {
2481     infoEvent("Applying undo to LCP: %d", m_latest_lcp);
2482     ndbout_c("Applying undo to LCP: %d", m_latest_lcp);
2483     find_log_head(signal, ptr);
2484     return;
2485   }
2486 
2487   signal->theData[0] = reference();
2488   sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2489 }
2490 
2491 void
find_log_head(Signal * signal,Ptr<Logfile_group> ptr)2492 Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
2493 {
2494   ndbrequire(ptr.p->m_state &
2495              (Logfile_group::LG_STARTING | Logfile_group::LG_SORTING));
2496 
2497   if(ptr.p->m_meta_files.isEmpty() && ptr.p->m_files.isEmpty())
2498   {
2499     jam();
2500     /**
2501      * Logfile_group wo/ any files
2502      */
2503     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_STARTING;
2504     ptr.p->m_state |= Logfile_group::LG_ONLINE;
2505     m_logfile_group_list.next(ptr);
2506     signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2507     signal->theData[1] = ptr.i;
2508     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2509     return;
2510   }
2511 
2512   ptr.p->m_state = Logfile_group::LG_SORTING;
2513 
2514   /**
2515    * Read first page from each undofile (1 file at a time...)
2516    */
2517   Local_undofile_list files(m_file_pool, ptr.p->m_meta_files);
2518   Ptr<Undofile> file_ptr;
2519   files.first(file_ptr);
2520 
2521   if(!file_ptr.isNull())
2522   {
2523     /**
2524      * Use log buffer memory when reading
2525      */
2526     Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2527     file_ptr.p->m_online.m_outstanding= page_id;
2528 
2529     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2530     req->filePointer = file_ptr.p->m_fd;
2531     req->userReference = reference();
2532     req->userPointer = file_ptr.i;
2533     req->varIndex = 1; // skip zero page
2534     req->numberOfPages = 1;
2535     req->data.pageData[0] = page_id;
2536     req->operationFlag = 0;
2537     FsReadWriteReq::setFormatFlag(req->operationFlag,
2538 				  FsReadWriteReq::fsFormatSharedPage);
2539 
2540     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2541 	       FsReadWriteReq::FixedLength + 1, JBA);
2542 
2543     ptr.p->m_outstanding_fs++;
2544     file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2545     return;
2546   }
2547   else
2548   {
2549     /**
2550      * All files have read first page
2551      *   and m_files is sorted acording to lsn
2552      */
2553     ndbrequire(!ptr.p->m_files.isEmpty());
2554     Local_undofile_list read_files(m_file_pool, ptr.p->m_files);
2555     read_files.last(file_ptr);
2556 
2557 
2558     /**
2559      * Init binary search
2560      */
2561     ptr.p->m_state = Logfile_group::LG_SEARCHING;
2562     file_ptr.p->m_state = Undofile::FS_SEARCHING;
2563     ptr.p->m_file_pos[TAIL].m_idx = 1;                   // left page
2564     ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size;
2565     ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) + 1;
2566 
2567     Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2568     file_ptr.p->m_online.m_outstanding= page_id;
2569 
2570     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2571     req->filePointer = file_ptr.p->m_fd;
2572     req->userReference = reference();
2573     req->userPointer = file_ptr.i;
2574     req->varIndex = ptr.p->m_file_pos[HEAD].m_ptr_i;
2575     req->numberOfPages = 1;
2576     req->data.pageData[0] = page_id;
2577     req->operationFlag = 0;
2578     FsReadWriteReq::setFormatFlag(req->operationFlag,
2579 				  FsReadWriteReq::fsFormatSharedPage);
2580 
2581     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2582 	       FsReadWriteReq::FixedLength + 1, JBA);
2583 
2584     ptr.p->m_outstanding_fs++;
2585     file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2586     return;
2587   }
2588 }
2589 
2590 void
execFSREADCONF(Signal * signal)2591 Lgman::execFSREADCONF(Signal* signal)
2592 {
2593   jamEntry();
2594   client_lock(number(), __LINE__);
2595 
2596   Ptr<Undofile> ptr;
2597   Ptr<Logfile_group> lg_ptr;
2598   FsConf* conf = (FsConf*)signal->getDataPtr();
2599 
2600   m_file_pool.getPtr(ptr, conf->userPointer);
2601   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2602 
2603   ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2604   ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2605 
2606   Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2607   ndbrequire(cnt);
2608 
2609   if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
2610   {
2611     jam();
2612 
2613     if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2614     {
2615       Uint32 tot= 0;
2616       Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2617       while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2618       {
2619 	Uint32 state= ptr.p->m_state;
2620 	Uint32 pages= ptr.p->m_online.m_outstanding;
2621 	ndbrequire(pages);
2622 	ptr.p->m_online.m_outstanding= 0;
2623 	ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2624 	tot += pages;
2625 	cnt--;
2626 
2627 	if((state & Undofile::FS_MOVE_NEXT) && !files.prev(ptr))
2628 	  files.last(ptr);
2629       }
2630 
2631       lg_ptr.p->m_outstanding_fs = cnt;
2632       lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
2633       lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2634     }
2635     client_unlock(number(), __LINE__);
2636     return;
2637   }
2638 
2639   lg_ptr.p->m_outstanding_fs = cnt - 1;
2640 
2641   Ptr<GlobalPage> page_ptr;
2642   m_shared_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding);
2643   ptr.p->m_online.m_outstanding= 0;
2644 
2645   File_formats::Undofile::Undo_page* page =
2646     (File_formats::Undofile::Undo_page*)page_ptr.p;
2647 
2648   Uint64 lsn = 0;
2649   lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2650   lsn += page->m_page_header.m_page_lsn_lo;
2651 
2652   switch(ptr.p->m_state){
2653   case Undofile::FS_SORTING:
2654     jam();
2655     break;
2656   case Undofile::FS_SEARCHING:
2657     jam();
2658     find_log_head_in_file(signal, lg_ptr, ptr, lsn);
2659     client_unlock(number(), __LINE__);
2660     return;
2661   default:
2662   case Undofile::FS_EXECUTING:
2663   case Undofile::FS_CREATING:
2664   case Undofile::FS_DROPPING:
2665   case Undofile::FS_ONLINE:
2666   case Undofile::FS_OPENING:
2667   case Undofile::FS_EMPTY:
2668     jam();
2669     ndbrequire(false);
2670   }
2671 
2672   /**
2673    * Prepare for execution
2674    */
2675   ptr.p->m_state = Undofile::FS_EXECUTING;
2676   ptr.p->m_online.m_lsn = lsn;
2677 
2678   /**
2679    * Insert into m_files
2680    */
2681   {
2682     Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
2683     Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2684     meta.remove(ptr);
2685 
2686     Ptr<Undofile> loop;
2687     files.first(loop);
2688     while(!loop.isNull() && loop.p->m_online.m_lsn <= lsn)
2689       files.next(loop);
2690 
2691     if(loop.isNull())
2692     {
2693       /**
2694        * File has highest lsn, add last
2695        */
2696       jam();
2697       files.add(ptr);
2698     }
2699     else
2700     {
2701       /**
2702        * Insert file in correct position in file list
2703        */
2704       files.insert(ptr, loop);
2705     }
2706   }
2707   find_log_head(signal, lg_ptr);
2708   client_unlock(number(), __LINE__);
2709 }
2710 
2711 void
execFSREADREF(Signal * signal)2712 Lgman::execFSREADREF(Signal* signal)
2713 {
2714   jamEntry();
2715   SimulatedBlock::execFSREADREF(signal);
2716   ndbrequire(false);
2717 }
2718 
2719 void
find_log_head_in_file(Signal * signal,Ptr<Logfile_group> ptr,Ptr<Undofile> file_ptr,Uint64 last_lsn)2720 Lgman::find_log_head_in_file(Signal* signal,
2721 			     Ptr<Logfile_group> ptr,
2722 			     Ptr<Undofile> file_ptr,
2723 			     Uint64 last_lsn)
2724 {
2725   //     a b
2726   // 3 4 5 0 1
2727   Uint32 curr= ptr.p->m_file_pos[HEAD].m_ptr_i;
2728   Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
2729   Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
2730 
2731   ndbrequire(head > tail);
2732   Uint32 diff = head - tail;
2733 
2734   if(DEBUG_SEARCH_LOG_HEAD)
2735     printf("tail: %d(%lld) head: %d last: %d(%lld) -> ",
2736 	   tail, file_ptr.p->m_online.m_lsn,
2737 	   head, curr, last_lsn);
2738   if(last_lsn > file_ptr.p->m_online.m_lsn)
2739   {
2740     if(DEBUG_SEARCH_LOG_HEAD)
2741       printf("moving tail ");
2742 
2743     file_ptr.p->m_online.m_lsn = last_lsn;
2744     ptr.p->m_file_pos[TAIL].m_idx = tail = curr;
2745   }
2746   else
2747   {
2748     if(DEBUG_SEARCH_LOG_HEAD)
2749       printf("moving head ");
2750 
2751     ptr.p->m_file_pos[HEAD].m_idx = head = curr;
2752   }
2753 
2754   if(diff > 1)
2755   {
2756     // We need to find more pages to be sure...
2757     ptr.p->m_file_pos[HEAD].m_ptr_i = curr = ((head + tail) >> 1);
2758 
2759     if(DEBUG_SEARCH_LOG_HEAD)
2760       ndbout_c("-> new search tail: %d(%lld) head: %d -> %d",
2761 	       tail, file_ptr.p->m_online.m_lsn,
2762 	       head, curr);
2763 
2764     Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2765     file_ptr.p->m_online.m_outstanding= page_id;
2766 
2767     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2768     req->filePointer = file_ptr.p->m_fd;
2769     req->userReference = reference();
2770     req->userPointer = file_ptr.i;
2771     req->varIndex = curr;
2772     req->numberOfPages = 1;
2773     req->data.pageData[0] = page_id;
2774     req->operationFlag = 0;
2775     FsReadWriteReq::setFormatFlag(req->operationFlag,
2776 				  FsReadWriteReq::fsFormatSharedPage);
2777 
2778     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2779 	       FsReadWriteReq::FixedLength + 1, JBA);
2780 
2781     ptr.p->m_outstanding_fs++;
2782     file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2783     return;
2784   }
2785 
2786   ndbrequire(diff == 1);
2787   if(DEBUG_SEARCH_LOG_HEAD)
2788     ndbout_c("-> found last page: %d", tail);
2789 
2790   ptr.p->m_state = 0;
2791   file_ptr.p->m_state = Undofile::FS_EXECUTING;
2792   ptr.p->m_last_lsn = file_ptr.p->m_online.m_lsn;
2793   ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_lsn;
2794   ptr.p->m_last_synced_lsn = file_ptr.p->m_online.m_lsn;
2795   m_last_lsn = file_ptr.p->m_online.m_lsn;
2796 
2797   /**
2798    * Set HEAD position
2799    */
2800   ptr.p->m_file_pos[HEAD].m_ptr_i = file_ptr.i;
2801   ptr.p->m_file_pos[HEAD].m_idx = tail;
2802 
2803   ptr.p->m_file_pos[TAIL].m_ptr_i = file_ptr.i;
2804   ptr.p->m_file_pos[TAIL].m_idx = tail - 1;
2805   ptr.p->m_next_reply_ptr_i = file_ptr.i;
2806 
2807   {
2808     Local_undofile_list files(m_file_pool, ptr.p->m_files);
2809     if(tail == 1)
2810     {
2811       /**
2812        * HEAD is first page in a file...
2813        *   -> PREV should be in previous file
2814        */
2815       Ptr<Undofile> prev = file_ptr;
2816       if(!files.prev(prev))
2817       {
2818 	files.last(prev);
2819       }
2820       ptr.p->m_file_pos[TAIL].m_ptr_i = prev.i;
2821       ptr.p->m_file_pos[TAIL].m_idx = prev.p->m_file_size - 1;
2822       ptr.p->m_next_reply_ptr_i = prev.i;
2823     }
2824 
2825     SimulatedBlock* fs = globalData.getBlock(NDBFS);
2826     infoEvent("Undo head - %s page: %d lsn: %lld",
2827 	      fs->get_filename(file_ptr.p->m_fd),
2828 	      tail, file_ptr.p->m_online.m_lsn);
2829     g_eventLogger->info("Undo head - %s page: %d lsn: %lld",
2830                         fs->get_filename(file_ptr.p->m_fd),
2831                         tail, file_ptr.p->m_online.m_lsn);
2832 
2833     for(files.prev(file_ptr); !file_ptr.isNull(); files.prev(file_ptr))
2834     {
2835       infoEvent("   - next - %s(%lld)",
2836 		fs->get_filename(file_ptr.p->m_fd),
2837 		file_ptr.p->m_online.m_lsn);
2838 
2839       g_eventLogger->info("   - next - %s(%lld)",
2840                           fs->get_filename(file_ptr.p->m_fd),
2841                           file_ptr.p->m_online.m_lsn);
2842     }
2843   }
2844 
2845   /**
2846    * Start next logfile group
2847    */
2848   m_logfile_group_list.next(ptr);
2849   signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2850   signal->theData[1] = ptr.i;
2851   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2852 }
2853 
2854 void
init_run_undo_log(Signal * signal)2855 Lgman::init_run_undo_log(Signal* signal)
2856 {
2857   /**
2858    * Perform initial sorting of logfile groups
2859    */
2860   Ptr<Logfile_group> group;
2861   Logfile_group_list& list= m_logfile_group_list;
2862   Logfile_group_list tmp(m_logfile_group_pool);
2863 
2864   bool found_any = false;
2865 
2866   list.first(group);
2867   while(!group.isNull())
2868   {
2869     Ptr<Logfile_group> ptr= group;
2870     list.next(group);
2871     list.remove(ptr);
2872 
2873     if (ptr.p->m_state & Logfile_group::LG_ONLINE)
2874     {
2875       /**
2876        * No logfiles in group
2877        */
2878       jam();
2879       tmp.addLast(ptr);
2880       continue;
2881     }
2882 
2883     found_any = true;
2884 
2885     {
2886       /**
2887        * Init buffer pointers
2888        */
2889       ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS;
2890       ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read
2891       ptr.p->m_pos[PRODUCER].m_current_page.m_idx = 0; // 0 more pages read
2892 
2893       Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2894       File_formats::Undofile::Undo_page* pageP =
2895 	(File_formats::Undofile::Undo_page*)m_shared_page_pool.getPtr(page);
2896 
2897       ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used;
2898       ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 1;
2899       ptr.p->m_last_read_lsn++;
2900     }
2901 
2902     /**
2903      * Start producer thread
2904      */
2905     signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2906     signal->theData[1] = ptr.i;
2907     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2908 
2909     /**
2910      * Insert in correct postion in list of logfile_group's
2911      */
2912     Ptr<Logfile_group> pos;
2913     for(tmp.first(pos); !pos.isNull(); tmp.next(pos))
2914       if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn)
2915 	break;
2916 
2917     if(pos.isNull())
2918       tmp.add(ptr);
2919     else
2920       tmp.insert(ptr, pos);
2921 
2922     ptr.p->m_state =
2923       Logfile_group::LG_EXEC_THREAD | Logfile_group::LG_READ_THREAD;
2924   }
2925   list = tmp;
2926 
2927   if (found_any == false)
2928   {
2929     /**
2930      * No logfilegroup had any logfiles
2931      */
2932     jam();
2933     signal->theData[0] = reference();
2934     sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2935     return;
2936   }
2937 
2938   execute_undo_record(signal);
2939 }
2940 
2941 void
read_undo_log(Signal * signal,Ptr<Logfile_group> ptr)2942 Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
2943 {
2944   Uint32 cnt, free= ptr.p->m_free_buffer_words;
2945 
2946   if(! (ptr.p->m_state & Logfile_group::LG_EXEC_THREAD))
2947   {
2948     jam();
2949     /**
2950      * Logfile_group is done...
2951      */
2952     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_READ_THREAD;
2953     stop_run_undo_log(signal);
2954     return;
2955   }
2956 
2957   if(free <= File_formats::UNDO_PAGE_WORDS)
2958   {
2959     signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2960     signal->theData[1] = ptr.i;
2961     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2962     return;
2963   }
2964 
2965   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2966   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2967 
2968   if(producer.m_current_page.m_idx == 0)
2969   {
2970     /**
2971      * zero pages left in range -> switch range
2972      */
2973     Lgman::Page_map::Iterator it;
2974     Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2975     Uint32 sz = map.getSize();
2976     Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz;
2977     map.position(it, pos);
2978     union {
2979       Uint32 _tmp[2];
2980       Lgman::Buffer_idx range;
2981     };
2982     _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2983     producer.m_current_page.m_ptr_i = pos;
2984     producer.m_current_page.m_idx = range.m_idx;
2985     producer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx;
2986   }
2987 
2988   if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i &&
2989      producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i)
2990   {
2991     Uint32 max=
2992       producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
2993     ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2994     cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2995     ndbrequire(cnt <= max);
2996     producer.m_current_pos.m_ptr_i -= cnt;
2997     producer.m_current_page.m_idx -= cnt;
2998   }
2999   else
3000   {
3001     Uint32 max= producer.m_current_page.m_idx;
3002     ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
3003     cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
3004     ndbrequire(cnt <= max);
3005     producer.m_current_pos.m_ptr_i -= cnt;
3006     producer.m_current_page.m_idx -= cnt;
3007   }
3008 
3009   ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
3010   free -= (cnt * File_formats::UNDO_PAGE_WORDS);
3011   ptr.p->m_free_buffer_words = free;
3012   ptr.p->m_pos[PRODUCER] = producer;
3013 
3014   signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
3015   signal->theData[1] = ptr.i;
3016 
3017   if(free > File_formats::UNDO_PAGE_WORDS)
3018     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
3019   else
3020     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
3021 }
3022 
3023 Uint32
read_undo_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 pages)3024 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
3025 		       Uint32 pageId, Uint32 pages)
3026 {
3027   ndbrequire(pages);
3028   Ptr<Undofile> filePtr;
3029   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
3030   m_file_pool.getPtr(filePtr, tail.m_ptr_i);
3031 
3032   if(filePtr.p->m_online.m_outstanding > 0)
3033   {
3034     jam();
3035     return 0;
3036   }
3037 
3038   Uint32 max= tail.m_idx;
3039 
3040   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
3041   req->filePointer = filePtr.p->m_fd;
3042   req->userReference = reference();
3043   req->userPointer = filePtr.i;
3044   req->operationFlag = 0;
3045   FsReadWriteReq::setFormatFlag(req->operationFlag,
3046 				FsReadWriteReq::fsFormatSharedPage);
3047 
3048 
3049   if(max > pages)
3050   {
3051     jam();
3052     tail.m_idx -= pages;
3053 
3054     req->varIndex = 1 + tail.m_idx;
3055     req->numberOfPages = pages;
3056     req->data.pageData[0] = pageId - pages;
3057     ptr.p->m_file_pos[TAIL] = tail;
3058 
3059     if(DEBUG_UNDO_EXECUTION)
3060       ndbout_c("a reading from file: %d page(%d-%d) into (%d-%d)",
3061 	       ptr.i, 1 + tail.m_idx, 1+tail.m_idx+pages-1,
3062 	       pageId - pages, pageId - 1);
3063 
3064     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
3065 	       FsReadWriteReq::FixedLength + 1, JBA);
3066 
3067     ptr.p->m_outstanding_fs++;
3068     filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
3069     filePtr.p->m_online.m_outstanding = pages;
3070     max = pages;
3071   }
3072   else
3073   {
3074     jam();
3075 
3076     ndbrequire(tail.m_idx - max == 0);
3077     req->varIndex = 1;
3078     req->numberOfPages = max;
3079     req->data.pageData[0] = pageId - max;
3080 
3081     if(DEBUG_UNDO_EXECUTION)
3082       ndbout_c("b reading from file: %d page(%d-%d) into (%d-%d)",
3083 	       ptr.i, 1 , 1+max-1,
3084 	       pageId - max, pageId - 1);
3085 
3086     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
3087 	       FsReadWriteReq::FixedLength + 1, JBA);
3088 
3089     ptr.p->m_outstanding_fs++;
3090     filePtr.p->m_online.m_outstanding = max;
3091     filePtr.p->m_state |= Undofile::FS_OUTSTANDING | Undofile::FS_MOVE_NEXT;
3092 
3093     Ptr<Undofile> prev = filePtr;
3094     {
3095       Local_undofile_list files(m_file_pool, ptr.p->m_files);
3096       if(!files.prev(prev))
3097       {
3098 	jam();
3099 	files.last(prev);
3100       }
3101     }
3102     if(DEBUG_UNDO_EXECUTION)
3103       ndbout_c("changing file from %d to %d", filePtr.i, prev.i);
3104 
3105     tail.m_idx= prev.p->m_file_size - 1;
3106     tail.m_ptr_i= prev.i;
3107     ptr.p->m_file_pos[TAIL] = tail;
3108     if(max < pages && filePtr.i != prev.i)
3109       max += read_undo_pages(signal, ptr, pageId - max, pages - max);
3110   }
3111 
3112   return max;
3113 
3114 }
3115 
3116 void
execute_undo_record(Signal * signal)3117 Lgman::execute_undo_record(Signal* signal)
3118 {
3119   Uint64 lsn;
3120   const Uint32* ptr;
3121   if((ptr = get_next_undo_record(&lsn)))
3122   {
3123     Uint32 len= (* ptr) & 0xFFFF;
3124     Uint32 type= (* ptr) >> 16;
3125     Uint32 mask= type & ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN;
3126     switch(mask){
3127     case File_formats::Undofile::UNDO_END:
3128       stop_run_undo_log(signal);
3129       return;
3130     case File_formats::Undofile::UNDO_LCP:
3131     case File_formats::Undofile::UNDO_LCP_FIRST:
3132     {
3133       Uint32 lcp = * (ptr - len + 1);
3134       if(m_latest_lcp && lcp > m_latest_lcp)
3135       {
3136         if (0)
3137         {
3138 	  const Uint32 * base = ptr - len + 1;
3139           Uint32 lcp = base[0];
3140           Uint32 tableId = base[1] >> 16;
3141           Uint32 fragId = base[1] & 0xFFFF;
3142 
3143 	  ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
3144 		   lcp, tableId, fragId);
3145 	}
3146       }
3147 
3148       if(m_latest_lcp == 0 ||
3149 	 lcp < m_latest_lcp ||
3150 	 (lcp == m_latest_lcp &&
3151 	  mask == File_formats::Undofile::UNDO_LCP_FIRST))
3152       {
3153 	stop_run_undo_log(signal);
3154 	return;
3155       }
3156       // Fallthrough
3157     }
3158     case File_formats::Undofile::UNDO_TUP_ALLOC:
3159     case File_formats::Undofile::UNDO_TUP_UPDATE:
3160     case File_formats::Undofile::UNDO_TUP_FREE:
3161     case File_formats::Undofile::UNDO_TUP_CREATE:
3162     case File_formats::Undofile::UNDO_TUP_DROP:
3163     case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
3164     case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
3165       {
3166         Dbtup_client tup(this, m_tup);
3167         tup.disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
3168         jamEntry();
3169       }
3170       return;
3171     default:
3172       ndbrequire(false);
3173     }
3174   }
3175   signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
3176   sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
3177 
3178   return;
3179 }
3180 
3181 const Uint32*
get_next_undo_record(Uint64 * this_lsn)3182 Lgman::get_next_undo_record(Uint64 * this_lsn)
3183 {
3184   Ptr<Logfile_group> ptr;
3185   m_logfile_group_list.first(ptr);
3186 
3187   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
3188   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
3189   if(producer.m_current_pos.m_idx < 2)
3190   {
3191     jam();
3192     /**
3193      * Wait for fetching pages...
3194      */
3195     return 0;
3196   }
3197 
3198   Uint32 pos = consumer.m_current_pos.m_idx;
3199   Uint32 page = consumer.m_current_pos.m_ptr_i;
3200 
3201   File_formats::Undofile::Undo_page* pageP=(File_formats::Undofile::Undo_page*)
3202     m_shared_page_pool.getPtr(page);
3203 
3204   if(pos == 0)
3205   {
3206     /**
3207      * End of log
3208      */
3209     pageP->m_data[0] = (File_formats::Undofile::UNDO_END << 16) | 1 ;
3210     pageP->m_page_header.m_page_lsn_hi = 0;
3211     pageP->m_page_header.m_page_lsn_lo = 0;
3212     pos= consumer.m_current_pos.m_idx= pageP->m_words_used = 1;
3213     this_lsn = 0;
3214     return pageP->m_data;
3215   }
3216 
3217   Uint32 *record= pageP->m_data + pos - 1;
3218   Uint32 len= (* record) & 0xFFFF;
3219   ndbrequire(len);
3220   Uint32 *prev= record - len;
3221   Uint64 lsn = 0;
3222 
3223   // Same page
3224   if(((* record) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
3225   {
3226     lsn = ptr.p->m_last_read_lsn - 1;
3227     ndbrequire((Int64)lsn >= 0);
3228   }
3229   else
3230   {
3231     ndbrequire(pos >= 3);
3232     lsn += * (prev - 1); lsn <<= 32;
3233     lsn += * (prev - 0);
3234     len += 2;
3235     ndbrequire((Int64)lsn >= 0);
3236   }
3237 
3238 
3239   ndbrequire(pos >= len);
3240 
3241   if(pos == len)
3242   {
3243     /**
3244      * Switching page
3245      */
3246     ndbrequire(producer.m_current_pos.m_idx);
3247     ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
3248 
3249     if(consumer.m_current_page.m_idx)
3250     {
3251       consumer.m_current_page.m_idx--;   // left in range
3252       consumer.m_current_pos.m_ptr_i --; // page
3253     }
3254     else
3255     {
3256       // 0 pages left in range...switch range
3257       Lgman::Page_map::Iterator it;
3258       Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
3259       Uint32 sz = map.getSize();
3260       Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz;
3261 
3262       map.position(it, tmp);
3263       union {
3264 	Uint32 _tmp[2];
3265 	Lgman::Buffer_idx range;
3266       };
3267 
3268       _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
3269 
3270       consumer.m_current_page.m_idx = range.m_idx - 1; // left in range
3271       consumer.m_current_page.m_ptr_i = tmp;           // pos in map
3272 
3273       consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page
3274     }
3275 
3276     if(DEBUG_UNDO_EXECUTION)
3277       ndbout_c("reading from %d", consumer.m_current_pos.m_ptr_i);
3278 
3279     pageP=(File_formats::Undofile::Undo_page*)
3280       m_shared_page_pool.getPtr(consumer.m_current_pos.m_ptr_i);
3281 
3282     pos= consumer.m_current_pos.m_idx= pageP->m_words_used;
3283 
3284     Uint64 tmp = 0;
3285     tmp += pageP->m_page_header.m_page_lsn_hi; tmp <<= 32;
3286     tmp += pageP->m_page_header.m_page_lsn_lo;
3287 
3288     prev = pageP->m_data + pos - 1;
3289 
3290     if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
3291     {
3292       ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
3293     }
3294 
3295     ptr.p->m_pos[CONSUMER] = consumer;
3296     ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
3297   }
3298   else
3299   {
3300     ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len;
3301   }
3302 
3303   * this_lsn = ptr.p->m_last_read_lsn = lsn;
3304 
3305   /**
3306    * Re-sort log file groups
3307    */
3308   Ptr<Logfile_group> sort = ptr;
3309   if(m_logfile_group_list.next(sort))
3310   {
3311     while(!sort.isNull() && sort.p->m_last_read_lsn > lsn)
3312       m_logfile_group_list.next(sort);
3313 
3314     if(sort.i != ptr.p->nextList)
3315     {
3316       m_logfile_group_list.remove(ptr);
3317       if(sort.isNull())
3318 	m_logfile_group_list.add(ptr);
3319       else
3320 	m_logfile_group_list.insert(ptr, sort);
3321     }
3322   }
3323   return record;
3324 }
3325 
3326 void
stop_run_undo_log(Signal * signal)3327 Lgman::stop_run_undo_log(Signal* signal)
3328 {
3329   bool running = false, outstanding = false;
3330   Ptr<Logfile_group> ptr;
3331   m_logfile_group_list.first(ptr);
3332   while(!ptr.isNull())
3333   {
3334     /**
3335      * Mark exec thread as completed
3336      */
3337     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_EXEC_THREAD;
3338 
3339     if(ptr.p->m_state & Logfile_group::LG_READ_THREAD)
3340     {
3341       /**
3342        * Thread is still running...wait for it to complete
3343        */
3344       running = true;
3345     }
3346     else if(ptr.p->m_outstanding_fs)
3347     {
3348       outstanding = true; // a FSREADREQ is outstanding...wait for it
3349     }
3350     else if(ptr.p->m_state != Logfile_group::LG_ONLINE)
3351     {
3352       /**
3353        * Fix log TAIL
3354        */
3355       ndbrequire(ptr.p->m_state == 0);
3356       ptr.p->m_state = Logfile_group::LG_ONLINE;
3357       Buffer_idx tail= ptr.p->m_file_pos[TAIL];
3358       Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
3359 
3360       while(pages)
3361       {
3362 	Ptr<Undofile> file;
3363 	m_file_pool.getPtr(file, tail.m_ptr_i);
3364 	Uint32 page= tail.m_idx;
3365 	Uint32 size= file.p->m_file_size;
3366 	ndbrequire(size >= page);
3367 	Uint32 diff= size - page;
3368 
3369 	if(pages >= diff)
3370 	{
3371 	  pages -= diff;
3372 	  Local_undofile_list files(m_file_pool, ptr.p->m_files);
3373 	  if(!files.next(file))
3374 	    files.first(file);
3375 	  tail.m_idx = 1;
3376 	  tail.m_ptr_i= file.i;
3377 	}
3378 	else
3379 	{
3380 	  tail.m_idx += pages;
3381 	  pages= 0;
3382 	}
3383       }
3384       ptr.p->m_tail_pos[0] = tail;
3385       ptr.p->m_tail_pos[1] = tail;
3386       ptr.p->m_tail_pos[2] = tail;
3387       ptr.p->m_file_pos[TAIL] = tail;
3388 
3389       init_logbuffer_pointers(ptr);
3390 
3391       {
3392 	Buffer_idx head= ptr.p->m_file_pos[HEAD];
3393 	Ptr<Undofile> file;
3394 	m_file_pool.getPtr(file, head.m_ptr_i);
3395 	if (head.m_idx == file.p->m_file_size - 1)
3396 	{
3397 	  Local_undofile_list files(m_file_pool, ptr.p->m_files);
3398 	  if(!files.next(file))
3399 	  {
3400 	    jam();
3401 	    files.first(file);
3402 	  }
3403 	  head.m_idx = 0;
3404 	  head.m_ptr_i = file.i;
3405 	  ptr.p->m_file_pos[HEAD] = head;
3406 	}
3407       }
3408 
3409       client_lock(number(), __LINE__);
3410       ptr.p->m_free_file_words = (Uint64)File_formats::UNDO_PAGE_WORDS *
3411 	(Uint64)compute_free_file_pages(ptr);
3412       client_unlock(number(), __LINE__);
3413       ptr.p->m_next_reply_ptr_i = ptr.p->m_file_pos[HEAD].m_ptr_i;
3414 
3415       ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
3416       signal->theData[0] = LgmanContinueB::FLUSH_LOG;
3417       signal->theData[1] = ptr.i;
3418       signal->theData[2] = 0;
3419       sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
3420 
3421       if(1)
3422       {
3423 	SimulatedBlock* fs = globalData.getBlock(NDBFS);
3424 	Ptr<Undofile> hf, tf;
3425 	m_file_pool.getPtr(tf, tail.m_ptr_i);
3426 	m_file_pool.getPtr(hf,  ptr.p->m_file_pos[HEAD].m_ptr_i);
3427 	infoEvent("Logfile group: %d ", ptr.p->m_logfile_group_id);
3428         g_eventLogger->info("Logfile group: %d ", ptr.p->m_logfile_group_id);
3429 	infoEvent("  head: %s page: %d",
3430                   fs->get_filename(hf.p->m_fd),
3431                   ptr.p->m_file_pos[HEAD].m_idx);
3432         g_eventLogger->info("  head: %s page: %d",
3433                             fs->get_filename(hf.p->m_fd),
3434                             ptr.p->m_file_pos[HEAD].m_idx);
3435 	infoEvent("  tail: %s page: %d",
3436 		  fs->get_filename(tf.p->m_fd), tail.m_idx);
3437         g_eventLogger->info("  tail: %s page: %d",
3438                             fs->get_filename(tf.p->m_fd), tail.m_idx);
3439       }
3440     }
3441 
3442     m_logfile_group_list.next(ptr);
3443   }
3444 
3445   if(running)
3446   {
3447     jam();
3448     return;
3449   }
3450 
3451   if(outstanding)
3452   {
3453     jam();
3454     signal->theData[0] = LgmanContinueB::STOP_UNDO_LOG;
3455     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 1);
3456     return;
3457   }
3458 
3459   infoEvent("Flushing page cache after undo completion");
3460   g_eventLogger->info("Flushing page cache after undo completion");
3461 
3462   /**
3463    * Start flushing pages (local, LCP)
3464    */
3465   LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
3466   ord->lcpId = m_latest_lcp;
3467   sendSignal(PGMAN_REF, GSN_LCP_FRAG_ORD, signal,
3468 	     LcpFragOrd::SignalLength, JBB);
3469 
3470   EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
3471   req->senderData = 0;
3472   req->senderRef = reference();
3473   req->backupId = m_latest_lcp;
3474   sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
3475 	     EndLcpReq::SignalLength, JBB);
3476 }
3477 
3478 void
execEND_LCP_CONF(Signal * signal)3479 Lgman::execEND_LCP_CONF(Signal* signal)
3480 {
3481   {
3482     Dbtup_client tup(this, m_tup);
3483     tup.disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
3484     jamEntry();
3485   }
3486 
3487   /**
3488    * pgman has completed flushing all pages
3489    *
3490    *   insert "fake" LCP record preventing undo to be "rerun"
3491    */
3492   Uint32 undo[3];
3493   undo[0] = m_latest_lcp;
3494   undo[1] = (0 << 16) | 0;
3495   undo[2] = (File_formats::Undofile::UNDO_LCP_FIRST << 16 )
3496     | (sizeof(undo) >> 2);
3497 
3498   Ptr<Logfile_group> ptr;
3499   ndbrequire(m_logfile_group_list.first(ptr));
3500 
3501   Uint64 last_lsn= m_last_lsn;
3502   if(ptr.p->m_last_lsn == last_lsn
3503 #ifdef VM_TRACE
3504      && ((rand() % 100) > 50)
3505 #endif
3506      )
3507   {
3508     undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
3509     Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
3510     memcpy(dst, undo, sizeof(undo));
3511     ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
3512     ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
3513   }
3514   else
3515   {
3516     Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
3517     * dst++ = (Uint32)(last_lsn >> 32);
3518     * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
3519     memcpy(dst, undo, sizeof(undo));
3520     ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
3521     ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
3522   }
3523   m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
3524 
3525   ptr.p->m_last_synced_lsn = last_lsn;
3526   while(m_logfile_group_list.next(ptr))
3527     ptr.p->m_last_synced_lsn = last_lsn;
3528 
3529   infoEvent("Flushing complete");
3530   g_eventLogger->info("Flushing complete");
3531 
3532   signal->theData[0] = reference();
3533   sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
3534 }
3535 
3536 #ifdef VM_TRACE
3537 void
validate_logfile_group(Ptr<Logfile_group> ptr,const char * heading)3538 Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
3539 {
3540   do
3541   {
3542     if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL)
3543       break;
3544 
3545     Uint32 pages = compute_free_file_pages(ptr);
3546 
3547     Uint32 group_pages =
3548       ((ptr.p->m_free_file_words + File_formats::UNDO_PAGE_WORDS - 1)/ File_formats::UNDO_PAGE_WORDS) ;
3549     Uint32 last = ptr.p->m_free_file_words % File_formats::UNDO_PAGE_WORDS;
3550 
3551     if(! (pages >= group_pages))
3552     {
3553       ndbout << heading << " Tail: " << ptr.p->m_file_pos[TAIL]
3554 	     << " Head: " << ptr.p->m_file_pos[HEAD]
3555 	     << " free: " << group_pages << "(" << last << ")"
3556 	     << " found: " << pages;
3557       for(Uint32 i = 0; i<3; i++)
3558       {
3559 	ndbout << " - " << ptr.p->m_tail_pos[i];
3560       }
3561       ndbout << endl;
3562 
3563       ndbrequire(pages >= group_pages);
3564     }
3565   } while(0);
3566 }
3567 #endif
3568 
execGET_TABINFOREQ(Signal * signal)3569 void Lgman::execGET_TABINFOREQ(Signal* signal)
3570 {
3571   jamEntry();
3572 
3573   if(!assembleFragments(signal))
3574   {
3575     return;
3576   }
3577 
3578   GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0];
3579 
3580   const Uint32 reqType = req->requestType & (~GetTabInfoReq::LongSignalConf);
3581   BlockReference retRef= req->senderRef;
3582   Uint32 senderData= req->senderData;
3583   Uint32 tableId= req->tableId;
3584 
3585   if(reqType == GetTabInfoReq::RequestByName)
3586   {
3587     jam();
3588     SectionHandle handle(this, signal);
3589     releaseSections(handle);
3590 
3591     sendGET_TABINFOREF(signal, req, GetTabInfoRef::NoFetchByName);
3592     return;
3593   }
3594 
3595   Logfile_group key;
3596   key.m_logfile_group_id= tableId;
3597   Ptr<Logfile_group> ptr;
3598   m_logfile_group_hash.find(ptr, key);
3599 
3600   if(ptr.p->m_logfile_group_id != tableId)
3601   {
3602     jam();
3603 
3604     sendGET_TABINFOREF(signal, req, GetTabInfoRef::InvalidTableId);
3605     return;
3606   }
3607 
3608 
3609   GetTabInfoConf *conf = (GetTabInfoConf *)&signal->theData[0];
3610 
3611   conf->senderData= senderData;
3612   conf->tableId= tableId;
3613   conf->freeWordsHi= (Uint32)(ptr.p->m_free_file_words >> 32);
3614   conf->freeWordsLo= (Uint32)(ptr.p->m_free_file_words & 0xFFFFFFFF);
3615   conf->tableType= DictTabInfo::LogfileGroup;
3616   conf->senderRef= reference();
3617   sendSignal(retRef, GSN_GET_TABINFO_CONF, signal,
3618 	     GetTabInfoConf::SignalLength, JBB);
3619 }
3620 
sendGET_TABINFOREF(Signal * signal,GetTabInfoReq * req,GetTabInfoRef::ErrorCode errorCode)3621 void Lgman::sendGET_TABINFOREF(Signal* signal,
3622 			       GetTabInfoReq * req,
3623 			       GetTabInfoRef::ErrorCode errorCode)
3624 {
3625   jamEntry();
3626   GetTabInfoRef * const ref = (GetTabInfoRef *)&signal->theData[0];
3627   /**
3628    * The format of GetTabInfo Req/Ref is the same
3629    */
3630   BlockReference retRef = req->senderRef;
3631   ref->errorCode = errorCode;
3632 
3633   sendSignal(retRef, GSN_GET_TABINFOREF, signal, signal->length(), JBB);
3634 }
3635