1 /*
2 Copyright (c) 2005, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "lgman.hpp"
26 #include "diskpage.hpp"
27 #include <signaldata/FsRef.hpp>
28 #include <signaldata/FsConf.hpp>
29 #include <signaldata/FsOpenReq.hpp>
30 #include <signaldata/FsCloseReq.hpp>
31 #include <signaldata/CreateFilegroupImpl.hpp>
32 #include <signaldata/DropFilegroupImpl.hpp>
33 #include <signaldata/FsReadWriteReq.hpp>
34 #include <signaldata/LCP.hpp>
35 #include <signaldata/SumaImpl.hpp>
36 #include <signaldata/LgmanContinueB.hpp>
37 #include <signaldata/GetTabInfo.hpp>
38 #include <signaldata/NodeFailRep.hpp>
39 #include <signaldata/DbinfoScan.hpp>
40 #include "dbtup/Dbtup.hpp"
41
42 #include <EventLogger.hpp>
43 extern EventLogger * g_eventLogger;
44
45 #include <record_types.hpp>
46
47 /**
48 * ---<a>-----<b>-----<c>-----<d>---> (time)
49 *
50 * <a> = start of lcp 1
51 * <b> = stop of lcp 1
52 * <c> = start of lcp 2
53 * <d> = stop of lcp 2
54 *
55 * If ndb crashes before <d>
56 * the entire undo log from crash point until <a> has to be applied
57 *
58 * at <d> the undo log can be cut til <c>
59 */
60
61 #define DEBUG_UNDO_EXECUTION 0
62 #define DEBUG_SEARCH_LOG_HEAD 0
63
64 #define FREE_BUFFER_MARGIN (2 * File_formats::UNDO_PAGE_WORDS)
65
Lgman(Block_context & ctx)66 Lgman::Lgman(Block_context & ctx) :
67 SimulatedBlock(LGMAN, ctx),
68 m_tup(0),
69 m_logfile_group_list(m_logfile_group_pool),
70 m_logfile_group_hash(m_logfile_group_pool),
71 m_client_mutex("lgman-client", 2, true)
72 {
73 BLOCK_CONSTRUCTOR(Lgman);
74
75 // Add received signals
76 addRecSignal(GSN_STTOR, &Lgman::execSTTOR);
77 addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
78 addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
79 addRecSignal(GSN_DBINFO_SCANREQ, &Lgman::execDBINFO_SCANREQ);
80 addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
81 addRecSignal(GSN_NODE_FAILREP, &Lgman::execNODE_FAILREP);
82
83 addRecSignal(GSN_CREATE_FILE_IMPL_REQ, &Lgman::execCREATE_FILE_IMPL_REQ);
84 addRecSignal(GSN_CREATE_FILEGROUP_IMPL_REQ,
85 &Lgman::execCREATE_FILEGROUP_IMPL_REQ);
86
87 addRecSignal(GSN_DROP_FILE_IMPL_REQ, &Lgman::execDROP_FILE_IMPL_REQ);
88 addRecSignal(GSN_DROP_FILEGROUP_IMPL_REQ,
89 &Lgman::execDROP_FILEGROUP_IMPL_REQ);
90
91 addRecSignal(GSN_FSWRITEREQ, &Lgman::execFSWRITEREQ);
92 addRecSignal(GSN_FSWRITEREF, &Lgman::execFSWRITEREF, true);
93 addRecSignal(GSN_FSWRITECONF, &Lgman::execFSWRITECONF);
94
95 addRecSignal(GSN_FSOPENREF, &Lgman::execFSOPENREF, true);
96 addRecSignal(GSN_FSOPENCONF, &Lgman::execFSOPENCONF);
97
98 addRecSignal(GSN_FSCLOSECONF, &Lgman::execFSCLOSECONF);
99
100 addRecSignal(GSN_FSREADREF, &Lgman::execFSREADREF, true);
101 addRecSignal(GSN_FSREADCONF, &Lgman::execFSREADCONF);
102
103 addRecSignal(GSN_LCP_FRAG_ORD, &Lgman::execLCP_FRAG_ORD);
104 addRecSignal(GSN_END_LCP_REQ, &Lgman::execEND_LCP_REQ);
105 addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP);
106 addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ);
107
108 addRecSignal(GSN_END_LCP_CONF, &Lgman::execEND_LCP_CONF);
109
110 addRecSignal(GSN_GET_TABINFOREQ, &Lgman::execGET_TABINFOREQ);
111
112 m_last_lsn = 1;
113 m_logfile_group_hash.setSize(10);
114
115 if (isNdbMtLqh()) {
116 jam();
117 int ret = m_client_mutex.create();
118 ndbrequire(ret == 0);
119 }
120
121 {
122 CallbackEntry& ce = m_callbackEntry[THE_NULL_CALLBACK];
123 ce.m_function = TheNULLCallback.m_callbackFunction;
124 ce.m_flags = 0;
125 }
126 {
127 CallbackEntry& ce = m_callbackEntry[ENDLCP_CALLBACK];
128 ce.m_function = safe_cast(&Lgman::endlcp_callback);
129 ce.m_flags = 0;
130 }
131 {
132 CallbackTable& ct = m_callbackTable;
133 ct.m_count = COUNT_CALLBACKS;
134 ct.m_entry = m_callbackEntry;
135 m_callbackTableAddr = &ct;
136 }
137 }
138
~Lgman()139 Lgman::~Lgman()
140 {
141 if (isNdbMtLqh()) {
142 (void)m_client_mutex.destroy();
143 }
144 }
145
146 void
client_lock(BlockNumber block,int line)147 Lgman::client_lock(BlockNumber block, int line)
148 {
149 if (isNdbMtLqh()) {
150 #ifdef VM_TRACE
151 Uint32 bno = blockToMain(block);
152 Uint32 ino = blockToInstance(block);
153 #endif
154 D("try lock " << bno << "/" << ino << V(line));
155 int ret = m_client_mutex.lock();
156 ndbrequire(ret == 0);
157 D("got lock " << bno << "/" << ino << V(line));
158 }
159 }
160
161 void
client_unlock(BlockNumber block,int line)162 Lgman::client_unlock(BlockNumber block, int line)
163 {
164 if (isNdbMtLqh()) {
165 #ifdef VM_TRACE
166 Uint32 bno = blockToMain(block);
167 Uint32 ino = blockToInstance(block);
168 #endif
169 D("unlock " << bno << "/" << ino << V(line));
170 int ret = m_client_mutex.unlock();
171 ndbrequire(ret == 0);
172 }
173 }
174
BLOCK_FUNCTIONS(Lgman)175 BLOCK_FUNCTIONS(Lgman)
176
177 void
178 Lgman::execREAD_CONFIG_REQ(Signal* signal)
179 {
180 jamEntry();
181
182 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
183
184 Uint32 ref = req->senderRef;
185 Uint32 senderData = req->senderData;
186
187 const ndb_mgm_configuration_iterator * p =
188 m_ctx.m_config.getOwnConfigIterator();
189 ndbrequire(p != 0);
190
191 Pool_context pc;
192 pc.m_block = this;
193 m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc);
194 m_file_pool.init(RT_LGMAN_FILE, pc);
195 m_logfile_group_pool.init(RT_LGMAN_FILEGROUP, pc);
196 // 10 -> 150M
197 m_data_buffer_pool.setSize(40);
198
199 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
200 conf->senderRef = reference();
201 conf->senderData = senderData;
202 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
203 ReadConfigConf::SignalLength, JBB);
204 }
205
206 void
execSTTOR(Signal * signal)207 Lgman::execSTTOR(Signal* signal)
208 {
209 jamEntry();
210 Uint32 startPhase = signal->theData[1];
211 switch (startPhase) {
212 case 1:
213 m_tup = globalData.getBlock(DBTUP);
214 ndbrequire(m_tup != 0);
215 break;
216 }
217 sendSTTORRY(signal);
218 }
219
220 void
sendSTTORRY(Signal * signal)221 Lgman::sendSTTORRY(Signal* signal)
222 {
223 signal->theData[0] = 0;
224 signal->theData[3] = 1;
225 signal->theData[4] = 2;
226 signal->theData[5] = 3;
227 signal->theData[6] = 4;
228 signal->theData[7] = 5;
229 signal->theData[8] = 6;
230 signal->theData[9] = 255; // No more start phases from missra
231 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
232 }
233
234 void
execCONTINUEB(Signal * signal)235 Lgman::execCONTINUEB(Signal* signal){
236 jamEntry();
237
238 Uint32 type= signal->theData[0];
239 Uint32 ptrI = signal->theData[1];
240 client_lock(number(), __LINE__);
241 switch(type){
242 case LgmanContinueB::FILTER_LOG:
243 jam();
244 break;
245 case LgmanContinueB::CUT_LOG_TAIL:
246 {
247 jam();
248 Ptr<Logfile_group> ptr;
249 m_logfile_group_pool.getPtr(ptr, ptrI);
250 cut_log_tail(signal, ptr);
251 break;
252 }
253 case LgmanContinueB::FLUSH_LOG:
254 {
255 jam();
256 Ptr<Logfile_group> ptr;
257 m_logfile_group_pool.getPtr(ptr, ptrI);
258 flush_log(signal, ptr, signal->theData[2]);
259 break;
260 }
261 case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
262 {
263 jam();
264 Ptr<Logfile_group> ptr;
265 m_logfile_group_pool.getPtr(ptr, ptrI);
266 process_log_buffer_waiters(signal, ptr);
267 break;
268 }
269 case LgmanContinueB::FIND_LOG_HEAD:
270 jam();
271 Ptr<Logfile_group> ptr;
272 if(ptrI != RNIL)
273 {
274 jam();
275 m_logfile_group_pool.getPtr(ptr, ptrI);
276 find_log_head(signal, ptr);
277 }
278 else
279 {
280 jam();
281 init_run_undo_log(signal);
282 }
283 break;
284 case LgmanContinueB::EXECUTE_UNDO_RECORD:
285 jam();
286 execute_undo_record(signal);
287 break;
288 case LgmanContinueB::STOP_UNDO_LOG:
289 jam();
290 stop_run_undo_log(signal);
291 break;
292 case LgmanContinueB::READ_UNDO_LOG:
293 {
294 jam();
295 Ptr<Logfile_group> ptr;
296 m_logfile_group_pool.getPtr(ptr, ptrI);
297 read_undo_log(signal, ptr);
298 break;
299 }
300 case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
301 {
302 jam();
303 Ptr<Logfile_group> ptr;
304 m_logfile_group_pool.getPtr(ptr, ptrI);
305 process_log_sync_waiters(signal, ptr);
306 break;
307 }
308 case LgmanContinueB::FORCE_LOG_SYNC:
309 {
310 jam();
311 Ptr<Logfile_group> ptr;
312 m_logfile_group_pool.getPtr(ptr, ptrI);
313 force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
314 break;
315 }
316 case LgmanContinueB::DROP_FILEGROUP:
317 {
318 jam();
319 Ptr<Logfile_group> ptr;
320 m_logfile_group_pool.getPtr(ptr, ptrI);
321 if ((ptr.p->m_state & Logfile_group::LG_THREAD_MASK) ||
322 ptr.p->m_outstanding_fs > 0)
323 {
324 jam();
325 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
326 signal->length());
327 break;
328 }
329 Uint32 ref = signal->theData[2];
330 Uint32 data = signal->theData[3];
331 drop_filegroup_drop_files(signal, ptr, ref, data);
332 break;
333 }
334 }
335 client_unlock(number(), __LINE__);
336 }
337
338 void
execNODE_FAILREP(Signal * signal)339 Lgman::execNODE_FAILREP(Signal* signal)
340 {
341 jamEntry();
342 const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
343 NdbNodeBitmask failed;
344 failed.assign(NdbNodeBitmask::Size, rep->theNodes);
345
346 /* Block level cleanup */
347 for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
348 jam();
349 if(failed.get(i)) {
350 jam();
351 Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
352 ndbassert(elementsCleaned == 0); // No distributed fragmented signals
353 (void) elementsCleaned; // Remove compiler warning
354 }//if
355 }//for
356 }
357
358 void
execDUMP_STATE_ORD(Signal * signal)359 Lgman::execDUMP_STATE_ORD(Signal* signal){
360 jamEntry();
361 if (signal->theData[0] == 12001 || signal->theData[0] == 12002)
362 {
363 char tmp[1024];
364 Ptr<Logfile_group> ptr;
365 m_logfile_group_list.first(ptr);
366 while(!ptr.isNull())
367 {
368 BaseString::snprintf(tmp, sizeof(tmp),
369 "lfg %u state: %x fs: %u lsn "
370 " [ last: %llu s(req): %llu s:ed: %llu lcp: %llu ] "
371 " waiters: %d %d",
372 ptr.p->m_logfile_group_id, ptr.p->m_state,
373 ptr.p->m_outstanding_fs,
374 ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
375 ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
376 !ptr.p->m_log_buffer_waiters.isEmpty(),
377 !ptr.p->m_log_sync_waiters.isEmpty());
378 if (signal->theData[0] == 12001)
379 infoEvent("%s", tmp);
380 ndbout_c("%s", tmp);
381
382 BaseString::snprintf(tmp, sizeof(tmp),
383 " callback_buffer_words: %u"
384 " free_buffer_words: %u free_file_words: %llu",
385 ptr.p->m_callback_buffer_words,
386 ptr.p->m_free_buffer_words,
387 ptr.p->m_free_file_words);
388 if (signal->theData[0] == 12001)
389 infoEvent("%s", tmp);
390 ndbout_c("%s", tmp);
391 if (!ptr.p->m_log_buffer_waiters.isEmpty())
392 {
393 Ptr<Log_waiter> waiter;
394 Local_log_waiter_list
395 list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
396 list.first(waiter);
397 BaseString::snprintf(tmp, sizeof(tmp),
398 " head(waiters).sz: %u %u",
399 waiter.p->m_size,
400 FREE_BUFFER_MARGIN);
401 if (signal->theData[0] == 12001)
402 infoEvent("%s", tmp);
403 ndbout_c("%s", tmp);
404 }
405 if (!ptr.p->m_log_sync_waiters.isEmpty())
406 {
407 Ptr<Log_waiter> waiter;
408 Local_log_waiter_list
409 list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
410 list.first(waiter);
411 BaseString::snprintf(tmp, sizeof(tmp),
412 " m_last_synced_lsn: %llu head(waiters %x).m_sync_lsn: %llu",
413 ptr.p->m_last_synced_lsn,
414 waiter.i,
415 waiter.p->m_sync_lsn);
416 if (signal->theData[0] == 12001)
417 infoEvent("%s", tmp);
418 ndbout_c("%s", tmp);
419
420 while(!waiter.isNull())
421 {
422 ndbout_c("ptr: %x %p lsn: %llu next: %x",
423 waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList);
424 list.next(waiter);
425 }
426 }
427 m_logfile_group_list.next(ptr);
428 }
429 }
430 if (signal->theData[0] == 12003)
431 {
432 bool crash = false;
433 Ptr<Logfile_group> ptr;
434 for (m_logfile_group_list.first(ptr); !ptr.isNull();
435 m_logfile_group_list.next(ptr))
436 {
437 if (ptr.p->m_callback_buffer_words != 0)
438 {
439 crash = true;
440 break;
441 }
442 }
443
444 if (crash)
445 {
446 ndbout_c("Detected logfile-group with non zero m_callback_buffer_words");
447 signal->theData[0] = 12002;
448 execDUMP_STATE_ORD(signal);
449 ndbrequire(false);
450 }
451 #ifdef VM_TRACE
452 else
453 {
454 ndbout_c("Check for non zero m_callback_buffer_words OK!");
455 }
456 #endif
457 }
458 }
459
460 void
execDBINFO_SCANREQ(Signal * signal)461 Lgman::execDBINFO_SCANREQ(Signal *signal)
462 {
463 DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
464 const Ndbinfo::ScanCursor* cursor =
465 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
466 Ndbinfo::Ratelimit rl;
467
468 jamEntry();
469
470 switch(req.tableId) {
471 case Ndbinfo::LOGSPACES_TABLEID:
472 {
473 jam();
474 Uint32 startBucket = cursor->data[0];
475 Logfile_group_hash_iterator iter;
476 m_logfile_group_hash.next(startBucket, iter);
477
478 while (!iter.curr.isNull())
479 {
480 jam();
481
482 Uint32 currentBucket = iter.bucket;
483 Ptr<Logfile_group> ptr = iter.curr;
484
485 Uint64 free = ptr.p->m_free_file_words*4;
486
487 Uint64 total = 0;
488 Local_undofile_list list(m_file_pool, ptr.p->m_files);
489 Ptr<Undofile> filePtr;
490 for (list.first(filePtr); !filePtr.isNull(); list.next(filePtr))
491 {
492 jam();
493 total += (Uint64)filePtr.p->m_file_size *
494 (Uint64)File_formats::NDB_PAGE_SIZE;
495 }
496
497 Uint64 high = 0; // TODO
498
499 Ndbinfo::Row row(signal, req);
500 row.write_uint32(getOwnNodeId());
501 row.write_uint32(1); // log type, 1 = DD-UNDO
502 row.write_uint32(ptr.p->m_logfile_group_id); // log id
503 row.write_uint32(0); // log part
504
505 row.write_uint64(total); // total allocated
506 row.write_uint64((total-free)); // currently in use
507 row.write_uint64(high); // in use high water mark
508 ndbinfo_send_row(signal, req, row, rl);
509
510 // move to next
511 if (m_logfile_group_hash.next(iter) == false)
512 {
513 jam(); // no more...
514 break;
515 }
516 else if (iter.bucket == currentBucket)
517 {
518 jam();
519 continue; // we need to iterate an entire bucket
520 }
521 else if (rl.need_break(req))
522 {
523 jam();
524 ndbinfo_send_scan_break(signal, req, rl, iter.bucket);
525 return;
526 }
527 }
528 break;
529 }
530
531 case Ndbinfo::LOGBUFFERS_TABLEID:
532 {
533 jam();
534 Uint32 startBucket = cursor->data[0];
535 Logfile_group_hash_iterator iter;
536 m_logfile_group_hash.next(startBucket, iter);
537
538 while (!iter.curr.isNull())
539 {
540 jam();
541
542 Uint32 currentBucket = iter.bucket;
543 Ptr<Logfile_group> ptr = iter.curr;
544
545 Uint64 free = ptr.p->m_free_buffer_words*4;
546 Uint64 total = ptr.p->m_total_buffer_words*4;
547 Uint64 high = 0; // TODO
548
549 Ndbinfo::Row row(signal, req);
550 row.write_uint32(getOwnNodeId());
551 row.write_uint32(1); // log type, 1 = DD-UNDO
552 row.write_uint32(ptr.p->m_logfile_group_id); // log id
553 row.write_uint32(0); // log part
554
555 row.write_uint64(total); // total allocated
556 row.write_uint64((total-free)); // currently in use
557 row.write_uint64(high); // in use high water mark
558 ndbinfo_send_row(signal, req, row, rl);
559
560 // move to next
561 if (m_logfile_group_hash.next(iter) == false)
562 {
563 jam(); // no more...
564 break;
565 }
566 else if (iter.bucket == currentBucket)
567 {
568 jam();
569 continue; // we need to iterate an entire bucket
570 }
571 else if (rl.need_break(req))
572 {
573 jam();
574 ndbinfo_send_scan_break(signal, req, rl, iter.bucket);
575 return;
576 }
577 }
578 break;
579 }
580
581 default:
582 break;
583 }
584
585 ndbinfo_send_scan_conf(signal, req, rl);
586 }
587
588 void
execCREATE_FILEGROUP_IMPL_REQ(Signal * signal)589 Lgman::execCREATE_FILEGROUP_IMPL_REQ(Signal* signal){
590 jamEntry();
591 CreateFilegroupImplReq* req= (CreateFilegroupImplReq*)signal->getDataPtr();
592
593 Uint32 senderRef = req->senderRef;
594 Uint32 senderData = req->senderData;
595
596 Ptr<Logfile_group> ptr;
597 CreateFilegroupImplRef::ErrorCode err = CreateFilegroupImplRef::NoError;
598 do {
599 if (m_logfile_group_hash.find(ptr, req->filegroup_id))
600 {
601 jam();
602 err = CreateFilegroupImplRef::FilegroupAlreadyExists;
603 break;
604 }
605
606 if (!m_logfile_group_list.isEmpty())
607 {
608 jam();
609 err = CreateFilegroupImplRef::OneLogfileGroupLimit;
610 break;
611 }
612
613 if (!m_logfile_group_pool.seize(ptr))
614 {
615 jam();
616 err = CreateFilegroupImplRef::OutOfFilegroupRecords;
617 break;
618 }
619
620 new (ptr.p) Logfile_group(req);
621
622 if (!alloc_logbuffer_memory(ptr, req->logfile_group.buffer_size))
623 {
624 jam();
625 err= CreateFilegroupImplRef::OutOfLogBufferMemory;
626 m_logfile_group_pool.release(ptr);
627 break;
628 }
629
630 m_logfile_group_hash.add(ptr);
631 m_logfile_group_list.add(ptr);
632
633 if ((getNodeState().getNodeRestartInProgress() &&
634 getNodeState().starting.restartType !=
635 NodeState::ST_INITIAL_NODE_RESTART)||
636 getNodeState().getSystemRestartInProgress())
637 {
638 ptr.p->m_state = Logfile_group::LG_STARTING;
639 }
640
641 CreateFilegroupImplConf* conf=
642 (CreateFilegroupImplConf*)signal->getDataPtr();
643 conf->senderData = senderData;
644 conf->senderRef = reference();
645 sendSignal(senderRef, GSN_CREATE_FILEGROUP_IMPL_CONF, signal,
646 CreateFilegroupImplConf::SignalLength, JBB);
647
648 return;
649 } while(0);
650
651 CreateFilegroupImplRef* ref= (CreateFilegroupImplRef*)signal->getDataPtr();
652 ref->senderData = senderData;
653 ref->senderRef = reference();
654 ref->errorCode = err;
655 sendSignal(senderRef, GSN_CREATE_FILEGROUP_IMPL_REF, signal,
656 CreateFilegroupImplRef::SignalLength, JBB);
657 }
658
659 void
execDROP_FILEGROUP_IMPL_REQ(Signal * signal)660 Lgman::execDROP_FILEGROUP_IMPL_REQ(Signal* signal)
661 {
662 jamEntry();
663
664 Uint32 errorCode = 0;
665 DropFilegroupImplReq req = *(DropFilegroupImplReq*)signal->getDataPtr();
666 do
667 {
668 Ptr<Logfile_group> ptr;
669 if (!m_logfile_group_hash.find(ptr, req.filegroup_id))
670 {
671 errorCode = DropFilegroupImplRef::NoSuchFilegroup;
672 break;
673 }
674
675 if (ptr.p->m_version != req.filegroup_version)
676 {
677 errorCode = DropFilegroupImplRef::InvalidFilegroupVersion;
678 break;
679 }
680
681 switch(req.requestInfo){
682 case DropFilegroupImplReq::Prepare:
683 break;
684 case DropFilegroupImplReq::Commit:
685 m_logfile_group_list.remove(ptr);
686 ptr.p->m_state |= Logfile_group::LG_DROPPING;
687 signal->theData[0] = LgmanContinueB::DROP_FILEGROUP;
688 signal->theData[1] = ptr.i;
689 signal->theData[2] = req.senderRef;
690 signal->theData[3] = req.senderData;
691 sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
692 return;
693 case DropFilegroupImplReq::Abort:
694 break;
695 default:
696 ndbrequire(false);
697 }
698 } while(0);
699
700 if (errorCode)
701 {
702 DropFilegroupImplRef* ref =
703 (DropFilegroupImplRef*)signal->getDataPtrSend();
704 ref->senderRef = reference();
705 ref->senderData = req.senderData;
706 ref->errorCode = errorCode;
707 sendSignal(req.senderRef, GSN_DROP_FILEGROUP_IMPL_REF, signal,
708 DropFilegroupImplRef::SignalLength, JBB);
709 }
710 else
711 {
712 DropFilegroupImplConf* conf =
713 (DropFilegroupImplConf*)signal->getDataPtrSend();
714 conf->senderRef = reference();
715 conf->senderData = req.senderData;
716 sendSignal(req.senderRef, GSN_DROP_FILEGROUP_IMPL_CONF, signal,
717 DropFilegroupImplConf::SignalLength, JBB);
718 }
719 }
720
721 void
drop_filegroup_drop_files(Signal * signal,Ptr<Logfile_group> ptr,Uint32 ref,Uint32 data)722 Lgman::drop_filegroup_drop_files(Signal* signal,
723 Ptr<Logfile_group> ptr,
724 Uint32 ref, Uint32 data)
725 {
726 jam();
727 ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
728 ndbrequire(ptr.p->m_outstanding_fs == 0);
729
730 Local_undofile_list list(m_file_pool, ptr.p->m_files);
731 Ptr<Undofile> file_ptr;
732
733 if (list.first(file_ptr))
734 {
735 jam();
736 ndbrequire(! (file_ptr.p->m_state & Undofile::FS_OUTSTANDING));
737 file_ptr.p->m_create.m_senderRef = ref;
738 file_ptr.p->m_create.m_senderData = data;
739 create_file_abort(signal, ptr, file_ptr);
740 return;
741 }
742
743 Local_undofile_list metalist(m_file_pool, ptr.p->m_meta_files);
744 if (metalist.first(file_ptr))
745 {
746 jam();
747 metalist.remove(file_ptr);
748 list.add(file_ptr);
749 file_ptr.p->m_create.m_senderRef = ref;
750 file_ptr.p->m_create.m_senderData = data;
751 create_file_abort(signal, ptr, file_ptr);
752 return;
753 }
754
755 free_logbuffer_memory(ptr);
756 m_logfile_group_hash.release(ptr);
757 DropFilegroupImplConf *conf = (DropFilegroupImplConf*)signal->getDataPtr();
758 conf->senderData = data;
759 conf->senderRef = reference();
760 sendSignal(ref, GSN_DROP_FILEGROUP_IMPL_CONF, signal,
761 DropFilegroupImplConf::SignalLength, JBB);
762 }
763
764 void
execCREATE_FILE_IMPL_REQ(Signal * signal)765 Lgman::execCREATE_FILE_IMPL_REQ(Signal* signal)
766 {
767 jamEntry();
768 CreateFileImplReq* req= (CreateFileImplReq*)signal->getDataPtr();
769
770 Uint32 senderRef = req->senderRef;
771 Uint32 senderData = req->senderData;
772 Uint32 requestInfo = req->requestInfo;
773
774 Ptr<Logfile_group> ptr;
775 CreateFileImplRef::ErrorCode err = CreateFileImplRef::NoError;
776 SectionHandle handle(this, signal);
777 do {
778 if (!m_logfile_group_hash.find(ptr, req->filegroup_id))
779 {
780 jam();
781 err = CreateFileImplRef::InvalidFilegroup;
782 break;
783 }
784
785 if (ptr.p->m_version != req->filegroup_version)
786 {
787 jam();
788 err = CreateFileImplRef::InvalidFilegroupVersion;
789 break;
790 }
791
792 Ptr<Undofile> file_ptr;
793 switch(requestInfo){
794 case CreateFileImplReq::Commit:
795 {
796 jam();
797 ndbrequire(find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id));
798 file_ptr.p->m_create.m_senderRef = req->senderRef;
799 file_ptr.p->m_create.m_senderData = req->senderData;
800 create_file_commit(signal, ptr, file_ptr);
801 return;
802 }
803 case CreateFileImplReq::Abort:
804 {
805 Uint32 senderRef = req->senderRef;
806 Uint32 senderData = req->senderData;
807 if (find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id))
808 {
809 jam();
810 file_ptr.p->m_create.m_senderRef = senderRef;
811 file_ptr.p->m_create.m_senderData = senderData;
812 create_file_abort(signal, ptr, file_ptr);
813 }
814 else
815 {
816 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
817 jam();
818 conf->senderData = senderData;
819 conf->senderRef = reference();
820 sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
821 CreateFileImplConf::SignalLength, JBB);
822 }
823 return;
824 }
825 default: // prepare
826 break;
827 }
828
829 if (!m_file_pool.seize(file_ptr))
830 {
831 jam();
832 err = CreateFileImplRef::OutOfFileRecords;
833 break;
834 }
835
836 if (!handle.m_cnt == 1)
837 {
838 ndbrequire(false);
839 }
840
841 if (ERROR_INSERTED(15000) ||
842 (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
843 {
844 jam();
845 err = CreateFileImplRef::FileSizeTooLarge;
846 break;
847 }
848
849 Uint64 sz = (Uint64(req->file_size_hi) << 32) + req->file_size_lo;
850 if (sz < 1024*1024)
851 {
852 jam();
853 err = CreateFileImplRef::FileSizeTooSmall;
854 break;
855 }
856
857 new (file_ptr.p) Undofile(req, ptr.i);
858
859 Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files);
860 tmp.add(file_ptr);
861
862 open_file(signal, file_ptr, req->requestInfo, &handle);
863 return;
864 } while(0);
865
866 releaseSections(handle);
867 CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
868 ref->senderData = senderData;
869 ref->senderRef = reference();
870 ref->errorCode = err;
871 sendSignal(senderRef, GSN_CREATE_FILE_IMPL_REF, signal,
872 CreateFileImplRef::SignalLength, JBB);
873 }
874
875 void
open_file(Signal * signal,Ptr<Undofile> ptr,Uint32 requestInfo,SectionHandle * handle)876 Lgman::open_file(Signal* signal, Ptr<Undofile> ptr,
877 Uint32 requestInfo,
878 SectionHandle * handle)
879 {
880 FsOpenReq* req = (FsOpenReq*)signal->getDataPtrSend();
881 req->userReference = reference();
882 req->userPointer = ptr.i;
883
884 memset(req->fileNumber, 0, sizeof(req->fileNumber));
885 FsOpenReq::setVersion(req->fileNumber, 4); // Version 4 = specified filename
886 FsOpenReq::v4_setBasePath(req->fileNumber, FsOpenReq::BP_DD_UF);
887
888 req->fileFlags = 0;
889 req->fileFlags |= FsOpenReq::OM_READWRITE;
890 req->fileFlags |= FsOpenReq::OM_DIRECT;
891 req->fileFlags |= FsOpenReq::OM_SYNC;
892 switch(requestInfo){
893 case CreateFileImplReq::Create:
894 req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
895 req->fileFlags |= FsOpenReq::OM_INIT;
896 ptr.p->m_state = Undofile::FS_CREATING;
897 break;
898 case CreateFileImplReq::CreateForce:
899 req->fileFlags |= FsOpenReq::OM_CREATE;
900 req->fileFlags |= FsOpenReq::OM_INIT;
901 ptr.p->m_state = Undofile::FS_CREATING;
902 break;
903 case CreateFileImplReq::Open:
904 req->fileFlags |= FsOpenReq::OM_CHECK_SIZE;
905 ptr.p->m_state = Undofile::FS_OPENING;
906 break;
907 default:
908 ndbrequire(false);
909 }
910
911 req->page_size = File_formats::NDB_PAGE_SIZE;
912 Uint64 size = (Uint64)ptr.p->m_file_size * (Uint64)File_formats::NDB_PAGE_SIZE;
913 req->file_size_hi = (Uint32)(size >> 32);
914 req->file_size_lo = (Uint32)(size & 0xFFFFFFFF);
915
916 sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBB,
917 handle);
918 }
919
920 void
execFSWRITEREQ(Signal * signal)921 Lgman::execFSWRITEREQ(Signal* signal)
922 {
923 jamEntry();
924 Ptr<Undofile> ptr;
925 Ptr<GlobalPage> page_ptr;
926 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
927
928 m_file_pool.getPtr(ptr, req->userPointer);
929 m_shared_page_pool.getPtr(page_ptr, req->data.pageData[0]);
930
931 if (req->varIndex == 0)
932 {
933 File_formats::Undofile::Zero_page* page =
934 (File_formats::Undofile::Zero_page*)page_ptr.p;
935 page->m_page_header.init(File_formats::FT_Undofile,
936 getOwnNodeId(),
937 ndbGetOwnVersion(),
938 (Uint32)time(0));
939 page->m_file_id = ptr.p->m_file_id;
940 page->m_logfile_group_id = ptr.p->m_create.m_logfile_group_id;
941 page->m_logfile_group_version = ptr.p->m_create.m_logfile_group_version;
942 page->m_undo_pages = ptr.p->m_file_size - 1; // minus zero page
943 }
944 else
945 {
946 File_formats::Undofile::Undo_page* page =
947 (File_formats::Undofile::Undo_page*)page_ptr.p;
948 page->m_page_header.m_page_lsn_hi = 0;
949 page->m_page_header.m_page_lsn_lo = 0;
950 page->m_page_header.m_page_type = File_formats::PT_Undopage;
951 page->m_words_used = 0;
952 }
953 }
954
955 void
execFSOPENREF(Signal * signal)956 Lgman::execFSOPENREF(Signal* signal)
957 {
958 jamEntry();
959
960 Ptr<Undofile> ptr;
961 Ptr<Logfile_group> lg_ptr;
962 FsRef* ref = (FsRef*)signal->getDataPtr();
963
964 Uint32 errCode = ref->errorCode;
965 Uint32 osErrCode = ref->osErrorCode;
966
967 m_file_pool.getPtr(ptr, ref->userPointer);
968 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
969
970 {
971 CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
972 ref->senderData = ptr.p->m_create.m_senderData;
973 ref->senderRef = reference();
974 ref->errorCode = CreateFileImplRef::FileError;
975 ref->fsErrCode = errCode;
976 ref->osErrCode = osErrCode;
977
978 sendSignal(ptr.p->m_create.m_senderRef, GSN_CREATE_FILE_IMPL_REF, signal,
979 CreateFileImplRef::SignalLength, JBB);
980 }
981
982 Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
983 meta.release(ptr);
984 }
985
986 #define HEAD 0
987 #define TAIL 1
988
989 void
execFSOPENCONF(Signal * signal)990 Lgman::execFSOPENCONF(Signal* signal)
991 {
992 jamEntry();
993 Ptr<Undofile> ptr;
994
995 FsConf* conf = (FsConf*)signal->getDataPtr();
996
997 Uint32 fd = conf->filePointer;
998 m_file_pool.getPtr(ptr, conf->userPointer);
999
1000 ptr.p->m_fd = fd;
1001
1002 {
1003 Uint32 senderRef = ptr.p->m_create.m_senderRef;
1004 Uint32 senderData = ptr.p->m_create.m_senderData;
1005
1006 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1007 conf->senderData = senderData;
1008 conf->senderRef = reference();
1009 sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1010 CreateFileImplConf::SignalLength, JBB);
1011 }
1012 }
1013
1014 bool
find_file_by_id(Ptr<Undofile> & ptr,Local_undofile_list::Head & head,Uint32 id)1015 Lgman::find_file_by_id(Ptr<Undofile>& ptr,
1016 Local_undofile_list::Head& head, Uint32 id)
1017 {
1018 Local_undofile_list list(m_file_pool, head);
1019 for(list.first(ptr); !ptr.isNull(); list.next(ptr))
1020 if(ptr.p->m_file_id == id)
1021 return true;
1022 return false;
1023 }
1024
1025 void
create_file_commit(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)1026 Lgman::create_file_commit(Signal* signal,
1027 Ptr<Logfile_group> lg_ptr,
1028 Ptr<Undofile> ptr)
1029 {
1030 Uint32 senderRef = ptr.p->m_create.m_senderRef;
1031 Uint32 senderData = ptr.p->m_create.m_senderData;
1032
1033 bool first= false;
1034 if(ptr.p->m_state == Undofile::FS_CREATING &&
1035 (lg_ptr.p->m_state & Logfile_group::LG_ONLINE))
1036 {
1037 jam();
1038 Local_undofile_list free(m_file_pool, lg_ptr.p->m_files);
1039 Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
1040 first= free.isEmpty();
1041 meta.remove(ptr);
1042 if(!first)
1043 {
1044 /**
1045 * Add log file next after current head
1046 */
1047 Ptr<Undofile> curr;
1048 m_file_pool.getPtr(curr, lg_ptr.p->m_file_pos[HEAD].m_ptr_i);
1049 if(free.next(curr))
1050 free.insert(ptr, curr); // inserts before (that's why the extra next)
1051 else
1052 free.add(ptr);
1053
1054 ptr.p->m_state = Undofile::FS_ONLINE | Undofile::FS_EMPTY;
1055 }
1056 else
1057 {
1058 /**
1059 * First file isn't empty as it can be written to at any time
1060 */
1061 free.add(ptr);
1062 ptr.p->m_state = Undofile::FS_ONLINE;
1063 lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
1064 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1065 signal->theData[1] = lg_ptr.i;
1066 signal->theData[2] = 0;
1067 sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1068 }
1069 }
1070 else
1071 {
1072 ptr.p->m_state = Undofile::FS_SORTING;
1073 }
1074
1075 ptr.p->m_online.m_lsn = 0;
1076 ptr.p->m_online.m_outstanding = 0;
1077
1078 Uint64 add= ptr.p->m_file_size - 1;
1079 lg_ptr.p->m_free_file_words += add * File_formats::UNDO_PAGE_WORDS;
1080
1081 if(first)
1082 {
1083 jam();
1084
1085 Buffer_idx tmp= { ptr.i, 0 };
1086 lg_ptr.p->m_file_pos[HEAD] = lg_ptr.p->m_file_pos[TAIL] = tmp;
1087
1088 /**
1089 * Init log tail pointer
1090 */
1091 lg_ptr.p->m_tail_pos[0] = tmp;
1092 lg_ptr.p->m_tail_pos[1] = tmp;
1093 lg_ptr.p->m_tail_pos[2] = tmp;
1094 lg_ptr.p->m_next_reply_ptr_i = ptr.i;
1095 }
1096
1097 validate_logfile_group(lg_ptr, "create_file_commit");
1098
1099 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1100 conf->senderData = senderData;
1101 conf->senderRef = reference();
1102 sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1103 CreateFileImplConf::SignalLength, JBB);
1104 }
1105
1106 void
create_file_abort(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)1107 Lgman::create_file_abort(Signal* signal,
1108 Ptr<Logfile_group> lg_ptr,
1109 Ptr<Undofile> ptr)
1110 {
1111 if (ptr.p->m_fd == RNIL)
1112 {
1113 ((FsConf*)signal->getDataPtr())->userPointer = ptr.i;
1114 execFSCLOSECONF(signal);
1115 return;
1116 }
1117
1118 FsCloseReq *req= (FsCloseReq*)signal->getDataPtrSend();
1119 req->filePointer = ptr.p->m_fd;
1120 req->userReference = reference();
1121 req->userPointer = ptr.i;
1122 req->fileFlag = 0;
1123 FsCloseReq::setRemoveFileFlag(req->fileFlag, true);
1124
1125 sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal,
1126 FsCloseReq::SignalLength, JBB);
1127 }
1128
1129 void
execFSCLOSECONF(Signal * signal)1130 Lgman::execFSCLOSECONF(Signal* signal)
1131 {
1132 Ptr<Undofile> ptr;
1133 Ptr<Logfile_group> lg_ptr;
1134 Uint32 ptrI = ((FsConf*)signal->getDataPtr())->userPointer;
1135 m_file_pool.getPtr(ptr, ptrI);
1136
1137 Uint32 senderRef = ptr.p->m_create.m_senderRef;
1138 Uint32 senderData = ptr.p->m_create.m_senderData;
1139
1140 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
1141
1142 if (lg_ptr.p->m_state & Logfile_group::LG_DROPPING)
1143 {
1144 jam();
1145 {
1146 Local_undofile_list list(m_file_pool, lg_ptr.p->m_files);
1147 list.release(ptr);
1148 }
1149 drop_filegroup_drop_files(signal, lg_ptr, senderRef, senderData);
1150 }
1151 else
1152 {
1153 jam();
1154 Local_undofile_list list(m_file_pool, lg_ptr.p->m_meta_files);
1155 list.release(ptr);
1156
1157 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1158 conf->senderData = senderData;
1159 conf->senderRef = reference();
1160 sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1161 CreateFileImplConf::SignalLength, JBB);
1162 }
1163 }
1164
1165 void
execDROP_FILE_IMPL_REQ(Signal * signal)1166 Lgman::execDROP_FILE_IMPL_REQ(Signal* signal)
1167 {
1168 jamEntry();
1169 ndbrequire(false);
1170 }
1171
1172 #define CONSUMER 0
1173 #define PRODUCER 1
1174
Logfile_group(const CreateFilegroupImplReq * req)1175 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req)
1176 {
1177 m_logfile_group_id = req->filegroup_id;
1178 m_version = req->filegroup_version;
1179 m_state = LG_ONLINE;
1180 m_outstanding_fs = 0;
1181 m_next_reply_ptr_i = RNIL;
1182
1183 m_last_lsn = 0;
1184 m_last_synced_lsn = 0;
1185 m_last_sync_req_lsn = 0;
1186 m_max_sync_req_lsn = 0;
1187 m_last_read_lsn = 0;
1188 m_file_pos[0].m_ptr_i= m_file_pos[1].m_ptr_i = RNIL;
1189
1190 m_free_file_words = 0;
1191 m_total_buffer_words = 0;
1192 m_free_buffer_words = 0;
1193 m_callback_buffer_words = 0;
1194
1195 m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
1196 m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
1197 m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
1198 m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
1199
1200 m_tail_pos[2].m_ptr_i= RNIL;
1201 m_tail_pos[2].m_idx= ~0;
1202
1203 m_tail_pos[0] = m_tail_pos[1] = m_tail_pos[2];
1204 }
1205
1206 bool
alloc_logbuffer_memory(Ptr<Logfile_group> ptr,Uint32 bytes)1207 Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes)
1208 {
1209 Uint32 pages= (((bytes + 3) >> 2) + File_formats::NDB_PAGE_SIZE_WORDS - 1)
1210 / File_formats::NDB_PAGE_SIZE_WORDS;
1211 Uint32 requested= pages;
1212 {
1213 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1214 while(pages)
1215 {
1216 Uint32 ptrI;
1217 Uint32 cnt = pages > 64 ? 64 : pages;
1218 m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1);
1219 if (cnt)
1220 {
1221 Buffer_idx range;
1222 range.m_ptr_i= ptrI;
1223 range.m_idx = cnt;
1224
1225 if (map.append((Uint32*)&range, 2) == false)
1226 {
1227 /**
1228 * Failed to append page-range...
1229 * jump out of alloc routine
1230 */
1231 jam();
1232 m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS,
1233 range.m_ptr_i, range.m_idx);
1234 break;
1235 }
1236 pages -= range.m_idx;
1237 }
1238 else
1239 {
1240 break;
1241 }
1242 }
1243 }
1244
1245 if(pages)
1246 {
1247 /* Could not allocate all of the requested memory.
1248 * So release that already allocated.
1249 */
1250 free_logbuffer_memory(ptr);
1251 return false;
1252 }
1253
1254 #if defined VM_TRACE || defined ERROR_INSERT
1255 ndbout << "DD lgman: fg id:" << ptr.p->m_logfile_group_id << " undo buffer pages/bytes:" << (requested-pages) << "/" << (requested-pages)*File_formats::NDB_PAGE_SIZE << endl;
1256 #endif
1257
1258 init_logbuffer_pointers(ptr);
1259 return true;
1260 }
1261
1262 void
init_logbuffer_pointers(Ptr<Logfile_group> ptr)1263 Lgman::init_logbuffer_pointers(Ptr<Logfile_group> ptr)
1264 {
1265 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1266 Page_map::Iterator it;
1267 union {
1268 Uint32 tmp[2];
1269 Buffer_idx range;
1270 };
1271
1272 map.first(it);
1273 tmp[0] = *it.data;
1274 ndbrequire(map.next(it));
1275 tmp[1] = *it.data;
1276
1277 ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0; // Index in page map
1278 ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range
1279 ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
1280 ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0; // Page pos
1281
1282 ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0; // Index in page map
1283 ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range
1284 ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
1285 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0; // Page pos
1286
1287 Uint32 pages= range.m_idx;
1288 while(map.next(it))
1289 {
1290 tmp[0] = *it.data;
1291 ndbrequire(map.next(it));
1292 tmp[1] = *it.data;
1293 pages += range.m_idx;
1294 }
1295
1296 ptr.p->m_total_buffer_words =
1297 ptr.p->m_free_buffer_words = pages * File_formats::UNDO_PAGE_WORDS;
1298 }
1299
1300 Uint32
compute_free_file_pages(Ptr<Logfile_group> ptr)1301 Lgman::compute_free_file_pages(Ptr<Logfile_group> ptr)
1302 {
1303 Buffer_idx head= ptr.p->m_file_pos[HEAD];
1304 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1305 Uint32 pages = 0;
1306 if (head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx)
1307 {
1308 pages += tail.m_idx - head.m_idx;
1309 }
1310 else
1311 {
1312 Ptr<Undofile> file;
1313 m_file_pool.getPtr(file, head.m_ptr_i);
1314 Local_undofile_list list(m_file_pool, ptr.p->m_files);
1315
1316 do
1317 {
1318 pages += (file.p->m_file_size - head.m_idx - 1);
1319 if(!list.next(file))
1320 list.first(file);
1321 head.m_idx = 0;
1322 } while(file.i != tail.m_ptr_i);
1323
1324 pages += tail.m_idx - head.m_idx;
1325 }
1326 return pages;
1327 }
1328
1329 void
free_logbuffer_memory(Ptr<Logfile_group> ptr)1330 Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr)
1331 {
1332 union {
1333 Uint32 tmp[2];
1334 Buffer_idx range;
1335 };
1336
1337 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1338
1339 Page_map::Iterator it;
1340 map.first(it);
1341 while(!it.isNull())
1342 {
1343 tmp[0] = *it.data;
1344 ndbrequire(map.next(it));
1345 tmp[1] = *it.data;
1346
1347 m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx);
1348 map.next(it);
1349 }
1350 map.release();
1351 }
1352
Undofile(const struct CreateFileImplReq * req,Uint32 ptrI)1353 Lgman::Undofile::Undofile(const struct CreateFileImplReq* req, Uint32 ptrI)
1354 {
1355 m_fd = RNIL;
1356 m_file_id = req->file_id;
1357 m_logfile_group_ptr_i= ptrI;
1358
1359 Uint64 pages = req->file_size_hi;
1360 pages = (pages << 32) | req->file_size_lo;
1361 pages /= GLOBAL_PAGE_SIZE;
1362 m_file_size = Uint32(pages);
1363 #if defined VM_TRACE || defined ERROR_INSERT
1364 ndbout << "DD lgman: file id:" << m_file_id << " undofile pages/bytes:" << m_file_size << "/" << m_file_size*GLOBAL_PAGE_SIZE << endl;
1365 #endif
1366
1367 m_create.m_senderRef = req->senderRef; // During META
1368 m_create.m_senderData = req->senderData; // During META
1369 m_create.m_logfile_group_id = req->filegroup_id;
1370 }
1371
Logfile_client(SimulatedBlock * block,Lgman * lgman,Uint32 logfile_group_id,bool lock)1372 Logfile_client::Logfile_client(SimulatedBlock* block,
1373 Lgman* lgman, Uint32 logfile_group_id,
1374 bool lock)
1375 {
1376 Uint32 bno = block->number();
1377 Uint32 ino = block->instance();
1378 m_client_block= block;
1379 m_block= numberToBlock(bno, ino);
1380 m_lgman= lgman;
1381 m_lock = lock;
1382 m_logfile_group_id= logfile_group_id;
1383 D("client ctor " << bno << "/" << ino);
1384 if (m_lock)
1385 m_lgman->client_lock(m_block, 0);
1386 }
1387
~Logfile_client()1388 Logfile_client::~Logfile_client()
1389 {
1390 #ifdef VM_TRACE
1391 Uint32 bno = blockToMain(m_block);
1392 Uint32 ino = blockToInstance(m_block);
1393 #endif
1394 D("client dtor " << bno << "/" << ino);
1395 if (m_lock)
1396 m_lgman->client_unlock(m_block, 0);
1397 }
1398
1399 int
sync_lsn(Signal * signal,Uint64 lsn,Request * req,Uint32 flags)1400 Logfile_client::sync_lsn(Signal* signal,
1401 Uint64 lsn, Request* req, Uint32 flags)
1402 {
1403 Ptr<Lgman::Logfile_group> ptr;
1404 if(m_lgman->m_logfile_group_list.first(ptr))
1405 {
1406 if(ptr.p->m_last_synced_lsn >= lsn)
1407 {
1408 return 1;
1409 }
1410
1411 bool empty= false;
1412 Ptr<Lgman::Log_waiter> wait;
1413 {
1414 Lgman::Local_log_waiter_list
1415 list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1416
1417 empty= list.isEmpty();
1418 if(!list.seize(wait))
1419 return -1;
1420
1421 wait.p->m_block= m_block;
1422 wait.p->m_sync_lsn= lsn;
1423 memcpy(&wait.p->m_callback, &req->m_callback,
1424 sizeof(SimulatedBlock::CallbackPtr));
1425
1426 ptr.p->m_max_sync_req_lsn = lsn > ptr.p->m_max_sync_req_lsn ?
1427 lsn : ptr.p->m_max_sync_req_lsn;
1428 }
1429
1430 if(ptr.p->m_last_sync_req_lsn < lsn &&
1431 ! (ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD))
1432 {
1433 ptr.p->m_state |= Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1434 signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1435 signal->theData[1] = ptr.i;
1436 signal->theData[2] = (Uint32)(lsn >> 32);
1437 signal->theData[3] = (Uint32)(lsn & 0xFFFFFFFF);
1438 m_client_block->sendSignalWithDelay(m_lgman->reference(),
1439 GSN_CONTINUEB, signal, 10, 4);
1440 }
1441 return 0;
1442 }
1443 return -1;
1444 }
1445
1446 void
force_log_sync(Signal * signal,Ptr<Logfile_group> ptr,Uint32 lsn_hi,Uint32 lsn_lo)1447 Lgman::force_log_sync(Signal* signal,
1448 Ptr<Logfile_group> ptr,
1449 Uint32 lsn_hi, Uint32 lsn_lo)
1450 {
1451 Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1452 Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo;
1453
1454 if(ptr.p->m_last_sync_req_lsn < force_lsn)
1455 {
1456 /**
1457 * Do force
1458 */
1459 Buffer_idx pos= ptr.p->m_pos[PRODUCER].m_current_pos;
1460 GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1461
1462 Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1463 if(pos.m_idx) // don't flush empty page...
1464 {
1465 Uint64 lsn= ptr.p->m_last_lsn - 1;
1466
1467 File_formats::Undofile::Undo_page* undo=
1468 (File_formats::Undofile::Undo_page*)page;
1469 undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1470 undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1471 undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1472
1473 /**
1474 * Update free space with extra NOOP
1475 */
1476 ndbrequire(ptr.p->m_free_file_words >= free);
1477 ndbrequire(ptr.p->m_free_buffer_words > free);
1478 ptr.p->m_free_file_words -= free;
1479 ptr.p->m_free_buffer_words -= free;
1480
1481 validate_logfile_group(ptr, "force_log_sync");
1482
1483 next_page(ptr.p, PRODUCER);
1484 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1485 }
1486 }
1487
1488
1489
1490 Uint64 max_req_lsn = ptr.p->m_max_sync_req_lsn;
1491 if(max_req_lsn > force_lsn &&
1492 max_req_lsn > ptr.p->m_last_sync_req_lsn)
1493 {
1494 ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
1495 signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1496 signal->theData[1] = ptr.i;
1497 signal->theData[2] = (Uint32)(max_req_lsn >> 32);
1498 signal->theData[3] = (Uint32)(max_req_lsn & 0xFFFFFFFF);
1499 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 4);
1500 }
1501 else
1502 {
1503 ptr.p->m_state &= ~(Uint32)Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1504 }
1505 }
1506
1507 void
process_log_sync_waiters(Signal * signal,Ptr<Logfile_group> ptr)1508 Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1509 {
1510 Local_log_waiter_list
1511 list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1512
1513 if(list.isEmpty())
1514 {
1515 return;
1516 }
1517
1518 bool removed= false;
1519 Ptr<Log_waiter> waiter;
1520 list.first(waiter);
1521 Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1522
1523 if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
1524 {
1525 removed= true;
1526 Uint32 block = waiter.p->m_block;
1527 CallbackPtr & callback = waiter.p->m_callback;
1528 sendCallbackConf(signal, block, callback, logfile_group_id);
1529
1530 list.releaseFirst(waiter);
1531 }
1532
1533 if(removed && !list.isEmpty())
1534 {
1535 ptr.p->m_state |= Logfile_group::LG_SYNC_WAITERS_THREAD;
1536 signal->theData[0] = LgmanContinueB::PROCESS_LOG_SYNC_WAITERS;
1537 signal->theData[1] = ptr.i;
1538 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1539 }
1540 else
1541 {
1542 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_SYNC_WAITERS_THREAD;
1543 }
1544 }
1545
1546
1547 Uint32*
get_log_buffer(Ptr<Logfile_group> ptr,Uint32 sz)1548 Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
1549 {
1550 GlobalPage *page;
1551 page=m_shared_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i);
1552
1553 Uint32 total_free= ptr.p->m_free_buffer_words;
1554 assert(total_free >= sz);
1555 Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
1556 Uint32 free= File_formats::UNDO_PAGE_WORDS - pos;
1557
1558 if(sz <= free)
1559 {
1560 next:
1561 // fits this page wo/ problem
1562 ndbrequire(total_free >= sz);
1563 ptr.p->m_free_buffer_words = total_free - sz;
1564 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
1565 return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
1566 }
1567
1568 /**
1569 * It didn't fit page...fill page with a NOOP log entry
1570 */
1571 Uint64 lsn= ptr.p->m_last_lsn - 1;
1572 File_formats::Undofile::Undo_page* undo=
1573 (File_formats::Undofile::Undo_page*)page;
1574 undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1575 undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1576 undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1577
1578 /**
1579 * Update free space with extra NOOP
1580 */
1581 ndbrequire(ptr.p->m_free_file_words >= free);
1582 ptr.p->m_free_file_words -= free;
1583
1584 validate_logfile_group(ptr, "get_log_buffer");
1585
1586 pos= 0;
1587 assert(total_free >= free);
1588 total_free -= free;
1589 page= m_shared_page_pool.getPtr(next_page(ptr.p, PRODUCER));
1590 goto next;
1591 }
1592
1593 Uint32
next_page(Logfile_group * ptrP,Uint32 i)1594 Lgman::next_page(Logfile_group* ptrP, Uint32 i)
1595 {
1596 Uint32 page_ptr_i= ptrP->m_pos[i].m_current_pos.m_ptr_i;
1597 Uint32 left_in_range= ptrP->m_pos[i].m_current_page.m_idx;
1598 if(left_in_range > 0)
1599 {
1600 ptrP->m_pos[i].m_current_page.m_idx = left_in_range - 1;
1601 ptrP->m_pos[i].m_current_pos.m_ptr_i = page_ptr_i + 1;
1602 return page_ptr_i + 1;
1603 }
1604 else
1605 {
1606 Lgman::Page_map map(m_data_buffer_pool, ptrP->m_buffer_pages);
1607 Uint32 pos= (ptrP->m_pos[i].m_current_page.m_ptr_i + 2) % map.getSize();
1608 Lgman::Page_map::Iterator it;
1609 map.position(it, pos);
1610
1611 union {
1612 Uint32 tmp[2];
1613 Lgman::Buffer_idx range;
1614 };
1615
1616 tmp[0] = *it.data; map.next(it);
1617 tmp[1] = *it.data;
1618
1619 ptrP->m_pos[i].m_current_page.m_ptr_i = pos; // New index in map
1620 ptrP->m_pos[i].m_current_page.m_idx = range.m_idx - 1; // Free pages
1621 ptrP->m_pos[i].m_current_pos.m_ptr_i = range.m_ptr_i; // Current page
1622 // No need to set ptrP->m_current_pos.m_idx, that is set "in higher"-func
1623 return range.m_ptr_i;
1624 }
1625 }
1626
1627 int
get_log_buffer(Signal * signal,Uint32 sz,SimulatedBlock::CallbackPtr * callback)1628 Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
1629 SimulatedBlock::CallbackPtr* callback)
1630 {
1631 sz += 2; // lsn
1632 Lgman::Logfile_group key;
1633 key.m_logfile_group_id= m_logfile_group_id;
1634 Ptr<Lgman::Logfile_group> ptr;
1635 if(m_lgman->m_logfile_group_hash.find(ptr, key))
1636 {
1637 Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
1638 Uint32 free_buffer = ptr.p->m_free_buffer_words;
1639 if (free_buffer >= (sz + callback_buffer + FREE_BUFFER_MARGIN) &&
1640 ptr.p->m_log_buffer_waiters.isEmpty())
1641 {
1642 ptr.p->m_callback_buffer_words = callback_buffer + sz;
1643 return 1;
1644 }
1645
1646 bool empty= false;
1647 {
1648 Ptr<Lgman::Log_waiter> wait;
1649 Lgman::Local_log_waiter_list
1650 list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1651
1652 empty= list.isEmpty();
1653 if(!list.seize(wait))
1654 {
1655 return -1;
1656 }
1657
1658 wait.p->m_size= sz;
1659 wait.p->m_block= m_block;
1660 memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::CallbackPtr));
1661 }
1662
1663 return 0;
1664 }
1665 return -1;
1666 }
1667
1668 NdbOut&
operator <<(NdbOut & out,const Lgman::Buffer_idx & pos)1669 operator<<(NdbOut& out, const Lgman::Buffer_idx& pos)
1670 {
1671 out << "[ "
1672 << pos.m_ptr_i << " "
1673 << pos.m_idx << " ]";
1674 return out;
1675 }
1676
1677 NdbOut&
operator <<(NdbOut & out,const Lgman::Logfile_group::Position & pos)1678 operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
1679 {
1680 out << "[ ("
1681 << pos.m_current_page.m_ptr_i << " "
1682 << pos.m_current_page.m_idx << ") ("
1683 << pos.m_current_pos.m_ptr_i << " "
1684 << pos.m_current_pos.m_idx << ") ]";
1685 return out;
1686 }
1687
1688 void
flush_log(Signal * signal,Ptr<Logfile_group> ptr,Uint32 force)1689 Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
1690 {
1691 Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
1692 Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
1693
1694 jamEntry();
1695
1696 if (consumer.m_current_page == producer.m_current_page)
1697 {
1698 jam();
1699 Buffer_idx pos = producer.m_current_pos;
1700
1701 #if 0
1702 if (force)
1703 {
1704 ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
1705 force, ptr.p->m_file_pos[HEAD].m_ptr_i);
1706 ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
1707 consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
1708 producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
1709 }
1710 #endif
1711 if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1712 {
1713 jam();
1714
1715 if (ptr.p->m_log_buffer_waiters.isEmpty() || pos.m_idx == 0)
1716 {
1717 jam();
1718 force = 0;
1719 }
1720 else if (ptr.p->m_free_buffer_words < FREE_BUFFER_MARGIN)
1721 {
1722 jam();
1723 force = 2;
1724 }
1725
1726 if (force < 2 || ptr.p->m_outstanding_fs)
1727 {
1728 jam();
1729 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1730 signal->theData[1] = ptr.i;
1731 signal->theData[2] = force + 1;
1732 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
1733 force ? 10 : 100, 3);
1734 return;
1735 }
1736 else
1737 {
1738 jam();
1739 GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1740
1741 Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1742
1743 ndbout_c("force flush %d %d outstanding: %u isEmpty(): %u",
1744 pos.m_idx, ptr.p->m_free_buffer_words,
1745 ptr.p->m_outstanding_fs,
1746 ptr.p->m_log_buffer_waiters.isEmpty());
1747
1748 ndbrequire(pos.m_idx); // don't flush empty page...
1749 Uint64 lsn= ptr.p->m_last_lsn - 1;
1750
1751 File_formats::Undofile::Undo_page* undo=
1752 (File_formats::Undofile::Undo_page*)page;
1753 undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1754 undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1755 undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1756
1757 /**
1758 * Update free space with extra NOOP
1759 */
1760 ndbrequire(ptr.p->m_free_file_words >= free);
1761 ndbrequire(ptr.p->m_free_buffer_words > free);
1762 ptr.p->m_free_file_words -= free;
1763 ptr.p->m_free_buffer_words -= free;
1764
1765 validate_logfile_group(ptr, "force_log_flush");
1766
1767 next_page(ptr.p, PRODUCER);
1768 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1769 producer = ptr.p->m_pos[PRODUCER];
1770 // break through
1771 }
1772 }
1773 else
1774 {
1775 jam();
1776 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1777 return;
1778 }
1779 }
1780
1781 bool full= false;
1782 Uint32 tot= 0;
1783 while(!(consumer.m_current_page == producer.m_current_page) && !full)
1784 {
1785 jam();
1786 validate_logfile_group(ptr, "before flush log");
1787
1788 Uint32 cnt; // pages written
1789 Uint32 page= consumer.m_current_pos.m_ptr_i;
1790 if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i)
1791 {
1792 /**
1793 * In same range
1794 */
1795 jam();
1796
1797 if(producer.m_current_pos.m_ptr_i > page)
1798 {
1799 /**
1800 * producer ahead of consumer in same chunk
1801 */
1802 jam();
1803 Uint32 tmp= producer.m_current_pos.m_ptr_i - page;
1804 cnt= write_log_pages(signal, ptr, page, tmp);
1805 assert(cnt <= tmp);
1806
1807 consumer.m_current_pos.m_ptr_i += cnt;
1808 consumer.m_current_page.m_idx -= cnt;
1809 full= (tmp > cnt);
1810 }
1811 else
1812 {
1813 /**
1814 * consumer ahead of producer in same chunk
1815 */
1816 Uint32 tmp= consumer.m_current_page.m_idx + 1;
1817 cnt= write_log_pages(signal, ptr, page, tmp);
1818 assert(cnt <= tmp);
1819
1820 if(cnt == tmp)
1821 {
1822 jam();
1823 /**
1824 * Entire chunk is written
1825 * move to next
1826 */
1827 ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1828 next_page(ptr.p, CONSUMER);
1829 consumer = ptr.p->m_pos[CONSUMER];
1830 }
1831 else
1832 {
1833 jam();
1834 /**
1835 * Failed to write entire chunk...
1836 */
1837 full= true;
1838 consumer.m_current_page.m_idx -= cnt;
1839 consumer.m_current_pos.m_ptr_i += cnt;
1840 }
1841 }
1842 }
1843 else
1844 {
1845 Uint32 tmp= consumer.m_current_page.m_idx + 1;
1846 cnt= write_log_pages(signal, ptr, page, tmp);
1847 assert(cnt <= tmp);
1848
1849 if(cnt == tmp)
1850 {
1851 jam();
1852 /**
1853 * Entire chunk is written
1854 * move to next
1855 */
1856 ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1857 next_page(ptr.p, CONSUMER);
1858 consumer = ptr.p->m_pos[CONSUMER];
1859 }
1860 else
1861 {
1862 jam();
1863 /**
1864 * Failed to write entire chunk...
1865 */
1866 full= true;
1867 consumer.m_current_page.m_idx -= cnt;
1868 consumer.m_current_pos.m_ptr_i += cnt;
1869 }
1870 }
1871
1872 tot += cnt;
1873 if(cnt)
1874 validate_logfile_group(ptr, " after flush_log");
1875 }
1876
1877 ptr.p->m_pos[CONSUMER]= consumer;
1878
1879 if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1880 {
1881 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1882 signal->theData[1] = ptr.i;
1883 signal->theData[2] = 0;
1884 sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1885 }
1886 else
1887 {
1888 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1889 }
1890 }
1891
1892 void
process_log_buffer_waiters(Signal * signal,Ptr<Logfile_group> ptr)1893 Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1894 {
1895 Uint32 free_buffer= ptr.p->m_free_buffer_words;
1896 Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
1897 Local_log_waiter_list
1898 list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1899
1900 if (list.isEmpty())
1901 {
1902 jam();
1903 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1904 return;
1905 }
1906
1907 bool removed= false;
1908 Ptr<Log_waiter> waiter;
1909 list.first(waiter);
1910 Uint32 sz = waiter.p->m_size;
1911 Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1912 if (sz + callback_buffer + FREE_BUFFER_MARGIN < free_buffer)
1913 {
1914 jam();
1915 removed= true;
1916 Uint32 block = waiter.p->m_block;
1917 CallbackPtr & callback = waiter.p->m_callback;
1918 ptr.p->m_callback_buffer_words += sz;
1919 sendCallbackConf(signal, block, callback, logfile_group_id);
1920
1921 list.releaseFirst(waiter);
1922 }
1923
1924 if (removed && !list.isEmpty())
1925 {
1926 jam();
1927 ptr.p->m_state |= Logfile_group::LG_WAITERS_THREAD;
1928 signal->theData[0] = LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS;
1929 signal->theData[1] = ptr.i;
1930 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1931 }
1932 else
1933 {
1934 jam();
1935 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1936 }
1937 }
1938
1939 #define REALLY_SLOW_FS 0
1940
1941 Uint32
write_log_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 in_pages)1942 Lgman::write_log_pages(Signal* signal, Ptr<Logfile_group> ptr,
1943 Uint32 pageId, Uint32 in_pages)
1944 {
1945 assert(in_pages);
1946 Ptr<Undofile> filePtr;
1947 Buffer_idx head= ptr.p->m_file_pos[HEAD];
1948 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1949 m_file_pool.getPtr(filePtr, head.m_ptr_i);
1950
1951 if(filePtr.p->m_online.m_outstanding > 0)
1952 {
1953 jam();
1954 return 0;
1955 }
1956
1957 Uint32 sz= filePtr.p->m_file_size - 1; // skip zero
1958 Uint32 max, pages= in_pages;
1959
1960 if(!(head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx))
1961 {
1962 max= sz - head.m_idx;
1963 }
1964 else
1965 {
1966 max= tail.m_idx - head.m_idx;
1967 }
1968
1969 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
1970 req->filePointer = filePtr.p->m_fd;
1971 req->userReference = reference();
1972 req->userPointer = filePtr.i;
1973 req->varIndex = 1+head.m_idx; // skip zero page
1974 req->numberOfPages = pages;
1975 req->data.pageData[0] = pageId;
1976 req->operationFlag = 0;
1977 FsReadWriteReq::setFormatFlag(req->operationFlag,
1978 FsReadWriteReq::fsFormatSharedPage);
1979
1980 if(max > pages)
1981 {
1982 jam();
1983 max= pages;
1984 head.m_idx += max;
1985 ptr.p->m_file_pos[HEAD] = head;
1986
1987 if (REALLY_SLOW_FS)
1988 sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1989 FsReadWriteReq::FixedLength + 1);
1990 else
1991 sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1992 FsReadWriteReq::FixedLength + 1, JBA);
1993
1994 ptr.p->m_outstanding_fs++;
1995 filePtr.p->m_online.m_outstanding = max;
1996 filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1997
1998 File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
1999 m_shared_page_pool.getPtr(pageId + max - 1);
2000 Uint64 lsn = 0;
2001 lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2002 lsn += page->m_page_header.m_page_lsn_lo;
2003
2004 filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file
2005 ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
2006 }
2007 else
2008 {
2009 jam();
2010 req->numberOfPages = max;
2011 FsReadWriteReq::setSyncFlag(req->operationFlag, 1);
2012
2013 if (REALLY_SLOW_FS)
2014 sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
2015 FsReadWriteReq::FixedLength + 1);
2016 else
2017 sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
2018 FsReadWriteReq::FixedLength + 1, JBA);
2019
2020 ptr.p->m_outstanding_fs++;
2021 filePtr.p->m_online.m_outstanding = max;
2022 filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
2023
2024 File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
2025 m_shared_page_pool.getPtr(pageId + max - 1);
2026 Uint64 lsn = 0;
2027 lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2028 lsn += page->m_page_header.m_page_lsn_lo;
2029
2030 filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file
2031 ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
2032
2033 Ptr<Undofile> next = filePtr;
2034 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2035 if(!files.next(next))
2036 {
2037 jam();
2038 files.first(next);
2039 }
2040 ndbout_c("changing file from %d to %d", filePtr.i, next.i);
2041 filePtr.p->m_state |= Undofile::FS_MOVE_NEXT;
2042 next.p->m_state &= ~(Uint32)Undofile::FS_EMPTY;
2043
2044 head.m_idx= 0;
2045 head.m_ptr_i= next.i;
2046 ptr.p->m_file_pos[HEAD] = head;
2047 if(max < pages)
2048 max += write_log_pages(signal, ptr, pageId + max, pages - max);
2049 }
2050
2051 assert(max);
2052 return max;
2053 }
2054
2055 void
execFSWRITEREF(Signal * signal)2056 Lgman::execFSWRITEREF(Signal* signal)
2057 {
2058 jamEntry();
2059 SimulatedBlock::execFSWRITEREF(signal);
2060 ndbrequire(false);
2061 }
2062
2063 void
execFSWRITECONF(Signal * signal)2064 Lgman::execFSWRITECONF(Signal* signal)
2065 {
2066 jamEntry();
2067 client_lock(number(), __LINE__);
2068 FsConf * conf = (FsConf*)signal->getDataPtr();
2069 Ptr<Undofile> ptr;
2070 m_file_pool.getPtr(ptr, conf->userPointer);
2071
2072 ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2073 ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2074
2075 Ptr<Logfile_group> lg_ptr;
2076 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2077
2078 Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2079 ndbrequire(cnt);
2080
2081 if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2082 {
2083 Uint32 tot= 0;
2084 Uint64 lsn = 0;
2085 {
2086 Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2087 while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2088 {
2089 Uint32 state= ptr.p->m_state;
2090 Uint32 pages= ptr.p->m_online.m_outstanding;
2091 ndbrequire(pages);
2092 ptr.p->m_online.m_outstanding= 0;
2093 ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2094 tot += pages;
2095 cnt--;
2096
2097 lsn = ptr.p->m_online.m_lsn;
2098
2099 if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr))
2100 files.first(ptr);
2101 }
2102 }
2103
2104 ndbassert(tot);
2105 lg_ptr.p->m_outstanding_fs = cnt;
2106 lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS);
2107 lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2108 lg_ptr.p->m_last_synced_lsn = lsn;
2109
2110 if(! (lg_ptr.p->m_state & Logfile_group::LG_SYNC_WAITERS_THREAD))
2111 {
2112 process_log_sync_waiters(signal, lg_ptr);
2113 }
2114
2115 if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
2116 {
2117 process_log_buffer_waiters(signal, lg_ptr);
2118 }
2119 }
2120 else
2121 {
2122 ndbout_c("miss matched writes");
2123 }
2124 client_unlock(number(), __LINE__);
2125
2126 return;
2127 }
2128
2129 void
execLCP_FRAG_ORD(Signal * signal)2130 Lgman::execLCP_FRAG_ORD(Signal* signal)
2131 {
2132 jamEntry();
2133 client_lock(number(), __LINE__);
2134 exec_lcp_frag_ord(signal, this);
2135 client_unlock(number(), __LINE__);
2136 }
2137
2138 void
exec_lcp_frag_ord(Signal * signal,SimulatedBlock * client_block)2139 Lgman::exec_lcp_frag_ord(Signal* signal, SimulatedBlock* client_block)
2140 {
2141 jamEntry();
2142
2143 LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
2144 Uint32 lcp_id= ord->lcpId;
2145 Uint32 frag_id = ord->fragmentId;
2146 Uint32 table_id = ord->tableId;
2147
2148 Ptr<Logfile_group> ptr;
2149 m_logfile_group_list.first(ptr);
2150
2151 Uint32 entry= lcp_id == m_latest_lcp ?
2152 File_formats::Undofile::UNDO_LCP : File_formats::Undofile::UNDO_LCP_FIRST;
2153 if(!ptr.isNull() && ! (ptr.p->m_state & Logfile_group::LG_CUT_LOG_THREAD))
2154 {
2155 jam();
2156 ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
2157 signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
2158 signal->theData[1] = ptr.i;
2159 client_block->sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2160 }
2161
2162 if(!ptr.isNull() && ptr.p->m_last_lsn)
2163 {
2164 Uint32 undo[3];
2165 undo[0] = lcp_id;
2166 undo[1] = (table_id << 16) | frag_id;
2167 undo[2] = (entry << 16 ) | (sizeof(undo) >> 2);
2168
2169 Uint64 last_lsn= m_last_lsn;
2170
2171 if(ptr.p->m_last_lsn == last_lsn
2172 #ifdef VM_TRACE
2173 && ((rand() % 100) > 50)
2174 #endif
2175 )
2176 {
2177 undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2178 Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
2179 memcpy(dst, undo, sizeof(undo));
2180 ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
2181 ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
2182 }
2183 else
2184 {
2185 Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
2186 * dst++ = (Uint32)(last_lsn >> 32);
2187 * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
2188 memcpy(dst, undo, sizeof(undo));
2189 ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
2190 ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
2191 }
2192 ptr.p->m_last_lcp_lsn = last_lsn;
2193 m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2194
2195 validate_logfile_group(ptr, "execLCP_FRAG_ORD");
2196 }
2197
2198 while(!ptr.isNull())
2199 {
2200 if (ptr.p->m_last_lsn)
2201 {
2202 /**
2203 * First LCP_FRAGORD for each LCP, sets tail pos
2204 */
2205 if(m_latest_lcp != lcp_id)
2206 {
2207 ptr.p->m_tail_pos[0] = ptr.p->m_tail_pos[1];
2208 ptr.p->m_tail_pos[1] = ptr.p->m_tail_pos[2];
2209 ptr.p->m_tail_pos[2] = ptr.p->m_file_pos[HEAD];
2210 }
2211
2212 if(0)
2213 ndbout_c
2214 ("execLCP_FRAG_ORD (%d %d) (%d %d) (%d %d) free pages: %ld",
2215 ptr.p->m_tail_pos[0].m_ptr_i, ptr.p->m_tail_pos[0].m_idx,
2216 ptr.p->m_tail_pos[1].m_ptr_i, ptr.p->m_tail_pos[1].m_idx,
2217 ptr.p->m_tail_pos[2].m_ptr_i, ptr.p->m_tail_pos[2].m_idx,
2218 (long) (ptr.p->m_free_file_words / File_formats::UNDO_PAGE_WORDS));
2219 }
2220 m_logfile_group_list.next(ptr);
2221 }
2222
2223 m_latest_lcp = lcp_id;
2224 }
2225
2226 void
execEND_LCP_REQ(Signal * signal)2227 Lgman::execEND_LCP_REQ(Signal* signal)
2228 {
2229 EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
2230 ndbrequire(m_latest_lcp == req->backupId);
2231 m_end_lcp_senderdata = req->senderData;
2232
2233 Ptr<Logfile_group> ptr;
2234 m_logfile_group_list.first(ptr);
2235 bool wait= false;
2236 while(!ptr.isNull())
2237 {
2238 Uint64 lcp_lsn = ptr.p->m_last_lcp_lsn;
2239 if(ptr.p->m_last_synced_lsn < lcp_lsn)
2240 {
2241 wait= true;
2242 if(signal->getSendersBlockRef() != reference())
2243 {
2244 D("Logfile_client - execEND_LCP_REQ");
2245 Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
2246 Logfile_client::Request req;
2247 req.m_callback.m_callbackData = ptr.i;
2248 req.m_callback.m_callbackIndex = ENDLCP_CALLBACK;
2249 ndbrequire(tmp.sync_lsn(signal, lcp_lsn, &req, 0) == 0);
2250 }
2251 }
2252 else
2253 {
2254 ptr.p->m_last_lcp_lsn = 0;
2255 }
2256 m_logfile_group_list.next(ptr);
2257 }
2258
2259 if(wait)
2260 {
2261 return;
2262 }
2263
2264 EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
2265 conf->senderData = m_end_lcp_senderdata;
2266 conf->senderRef = reference();
2267 sendSignal(DBLQH_REF, GSN_END_LCP_CONF,
2268 signal, EndLcpConf::SignalLength, JBB);
2269 }
2270
2271 void
endlcp_callback(Signal * signal,Uint32 ptr,Uint32 res)2272 Lgman::endlcp_callback(Signal* signal, Uint32 ptr, Uint32 res)
2273 {
2274 EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
2275 req->backupId = m_latest_lcp;
2276 req->senderData = m_end_lcp_senderdata;
2277 execEND_LCP_REQ(signal);
2278 }
2279
2280 void
cut_log_tail(Signal * signal,Ptr<Logfile_group> ptr)2281 Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
2282 {
2283 bool done= true;
2284 if (likely(ptr.p->m_last_lsn))
2285 {
2286 Buffer_idx tmp= ptr.p->m_tail_pos[0];
2287 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2288
2289 Ptr<Undofile> filePtr;
2290 m_file_pool.getPtr(filePtr, tail.m_ptr_i);
2291
2292 if(!(tmp == tail))
2293 {
2294 Uint32 free;
2295 if(tmp.m_ptr_i == tail.m_ptr_i && tail.m_idx < tmp.m_idx)
2296 {
2297 free= tmp.m_idx - tail.m_idx;
2298 ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
2299 ptr.p->m_file_pos[TAIL] = tmp;
2300 }
2301 else
2302 {
2303 free= filePtr.p->m_file_size - tail.m_idx - 1;
2304 ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
2305
2306 Ptr<Undofile> next = filePtr;
2307 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2308 while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
2309 ndbrequire(next.i != filePtr.i);
2310 if(next.isNull())
2311 {
2312 jam();
2313 files.first(next);
2314 while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
2315 ndbrequire(next.i != filePtr.i);
2316 }
2317
2318 tmp.m_idx= 0;
2319 tmp.m_ptr_i= next.i;
2320 ptr.p->m_file_pos[TAIL] = tmp;
2321 done= false;
2322 }
2323 }
2324
2325 validate_logfile_group(ptr, "cut log");
2326 }
2327
2328 if (done)
2329 {
2330 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_CUT_LOG_THREAD;
2331 m_logfile_group_list.next(ptr);
2332 }
2333
2334 if(!done || !ptr.isNull())
2335 {
2336 ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
2337 signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
2338 signal->theData[1] = ptr.i;
2339 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2340 }
2341 }
2342
2343 void
execSUB_GCP_COMPLETE_REP(Signal * signal)2344 Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal)
2345 {
2346 jamEntry();
2347
2348 Ptr<Logfile_group> ptr;
2349 m_logfile_group_list.first(ptr);
2350
2351 /**
2352 * Filter all logfile groups in parallell
2353 */
2354 return; // NOT IMPLETMENT YET
2355
2356 signal->theData[0] = LgmanContinueB::FILTER_LOG;
2357 while(!ptr.isNull())
2358 {
2359 signal->theData[1] = ptr.i;
2360 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2361 m_logfile_group_list.next(ptr);
2362 }
2363 }
2364
2365 int
alloc_log_space(Uint32 ref,Uint32 words)2366 Lgman::alloc_log_space(Uint32 ref, Uint32 words)
2367 {
2368 ndbrequire(words);
2369 words += 2; // lsn
2370 Logfile_group key;
2371 key.m_logfile_group_id= ref;
2372 Ptr<Logfile_group> ptr;
2373 if(m_logfile_group_hash.find(ptr, key) &&
2374 ptr.p->m_free_file_words >= (words + (4 * File_formats::UNDO_PAGE_WORDS)))
2375 {
2376 ptr.p->m_free_file_words -= words;
2377 validate_logfile_group(ptr, "alloc_log_space");
2378 return 0;
2379 }
2380
2381 if(ptr.isNull())
2382 {
2383 return -1;
2384 }
2385
2386 return 1501;
2387 }
2388
2389 int
free_log_space(Uint32 ref,Uint32 words)2390 Lgman::free_log_space(Uint32 ref, Uint32 words)
2391 {
2392 ndbrequire(words);
2393 Logfile_group key;
2394 key.m_logfile_group_id= ref;
2395 Ptr<Logfile_group> ptr;
2396 if(m_logfile_group_hash.find(ptr, key))
2397 {
2398 ptr.p->m_free_file_words += (words + 2);
2399 validate_logfile_group(ptr, "free_log_space");
2400 return 0;
2401 }
2402 ndbrequire(false);
2403 return -1;
2404 }
2405
2406 Uint64
add_entry(const Change * src,Uint32 cnt)2407 Logfile_client::add_entry(const Change* src, Uint32 cnt)
2408 {
2409 Uint32 i, tot= 0;
2410 for(i= 0; i<cnt; i++)
2411 {
2412 tot += src[i].len;
2413 }
2414
2415 Uint32 *dst;
2416 Uint64 last_lsn= m_lgman->m_last_lsn;
2417 {
2418 Lgman::Logfile_group key;
2419 key.m_logfile_group_id= m_logfile_group_id;
2420 Ptr<Lgman::Logfile_group> ptr;
2421 if(m_lgman->m_logfile_group_hash.find(ptr, key))
2422 {
2423 Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
2424 Uint64 last_lsn_filegroup= ptr.p->m_last_lsn;
2425 if(last_lsn_filegroup == last_lsn
2426 #ifdef VM_TRACE
2427 && ((rand() % 100) > 50)
2428 #endif
2429 )
2430 {
2431 dst= m_lgman->get_log_buffer(ptr, tot);
2432 for(i= 0; i<cnt; i++)
2433 {
2434 memcpy(dst, src[i].ptr, 4*src[i].len);
2435 dst += src[i].len;
2436 }
2437 * (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2438 ptr.p->m_free_file_words += 2;
2439 m_lgman->validate_logfile_group(ptr);
2440 }
2441 else
2442 {
2443 dst= m_lgman->get_log_buffer(ptr, tot + 2);
2444 * dst++ = (Uint32)(last_lsn >> 32);
2445 * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
2446 for(i= 0; i<cnt; i++)
2447 {
2448 memcpy(dst, src[i].ptr, 4*src[i].len);
2449 dst += src[i].len;
2450 }
2451 }
2452 /**
2453 * for callback_buffer, always allocats 2 extra...
2454 * not knowing if LSN must be added or not
2455 */
2456 tot += 2;
2457
2458 if (unlikely(! (tot <= callback_buffer)))
2459 {
2460 abort();
2461 }
2462 ptr.p->m_callback_buffer_words = callback_buffer - tot;
2463 }
2464
2465 m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2466
2467 return last_lsn;
2468 }
2469 }
2470
2471 void
execSTART_RECREQ(Signal * signal)2472 Lgman::execSTART_RECREQ(Signal* signal)
2473 {
2474 m_latest_lcp = signal->theData[0];
2475
2476 Ptr<Logfile_group> ptr;
2477 m_logfile_group_list.first(ptr);
2478
2479 if(ptr.i != RNIL)
2480 {
2481 infoEvent("Applying undo to LCP: %d", m_latest_lcp);
2482 ndbout_c("Applying undo to LCP: %d", m_latest_lcp);
2483 find_log_head(signal, ptr);
2484 return;
2485 }
2486
2487 signal->theData[0] = reference();
2488 sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2489 }
2490
2491 void
find_log_head(Signal * signal,Ptr<Logfile_group> ptr)2492 Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
2493 {
2494 ndbrequire(ptr.p->m_state &
2495 (Logfile_group::LG_STARTING | Logfile_group::LG_SORTING));
2496
2497 if(ptr.p->m_meta_files.isEmpty() && ptr.p->m_files.isEmpty())
2498 {
2499 jam();
2500 /**
2501 * Logfile_group wo/ any files
2502 */
2503 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_STARTING;
2504 ptr.p->m_state |= Logfile_group::LG_ONLINE;
2505 m_logfile_group_list.next(ptr);
2506 signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2507 signal->theData[1] = ptr.i;
2508 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2509 return;
2510 }
2511
2512 ptr.p->m_state = Logfile_group::LG_SORTING;
2513
2514 /**
2515 * Read first page from each undofile (1 file at a time...)
2516 */
2517 Local_undofile_list files(m_file_pool, ptr.p->m_meta_files);
2518 Ptr<Undofile> file_ptr;
2519 files.first(file_ptr);
2520
2521 if(!file_ptr.isNull())
2522 {
2523 /**
2524 * Use log buffer memory when reading
2525 */
2526 Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2527 file_ptr.p->m_online.m_outstanding= page_id;
2528
2529 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2530 req->filePointer = file_ptr.p->m_fd;
2531 req->userReference = reference();
2532 req->userPointer = file_ptr.i;
2533 req->varIndex = 1; // skip zero page
2534 req->numberOfPages = 1;
2535 req->data.pageData[0] = page_id;
2536 req->operationFlag = 0;
2537 FsReadWriteReq::setFormatFlag(req->operationFlag,
2538 FsReadWriteReq::fsFormatSharedPage);
2539
2540 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2541 FsReadWriteReq::FixedLength + 1, JBA);
2542
2543 ptr.p->m_outstanding_fs++;
2544 file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2545 return;
2546 }
2547 else
2548 {
2549 /**
2550 * All files have read first page
2551 * and m_files is sorted acording to lsn
2552 */
2553 ndbrequire(!ptr.p->m_files.isEmpty());
2554 Local_undofile_list read_files(m_file_pool, ptr.p->m_files);
2555 read_files.last(file_ptr);
2556
2557
2558 /**
2559 * Init binary search
2560 */
2561 ptr.p->m_state = Logfile_group::LG_SEARCHING;
2562 file_ptr.p->m_state = Undofile::FS_SEARCHING;
2563 ptr.p->m_file_pos[TAIL].m_idx = 1; // left page
2564 ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size;
2565 ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) + 1;
2566
2567 Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2568 file_ptr.p->m_online.m_outstanding= page_id;
2569
2570 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2571 req->filePointer = file_ptr.p->m_fd;
2572 req->userReference = reference();
2573 req->userPointer = file_ptr.i;
2574 req->varIndex = ptr.p->m_file_pos[HEAD].m_ptr_i;
2575 req->numberOfPages = 1;
2576 req->data.pageData[0] = page_id;
2577 req->operationFlag = 0;
2578 FsReadWriteReq::setFormatFlag(req->operationFlag,
2579 FsReadWriteReq::fsFormatSharedPage);
2580
2581 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2582 FsReadWriteReq::FixedLength + 1, JBA);
2583
2584 ptr.p->m_outstanding_fs++;
2585 file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2586 return;
2587 }
2588 }
2589
2590 void
execFSREADCONF(Signal * signal)2591 Lgman::execFSREADCONF(Signal* signal)
2592 {
2593 jamEntry();
2594 client_lock(number(), __LINE__);
2595
2596 Ptr<Undofile> ptr;
2597 Ptr<Logfile_group> lg_ptr;
2598 FsConf* conf = (FsConf*)signal->getDataPtr();
2599
2600 m_file_pool.getPtr(ptr, conf->userPointer);
2601 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2602
2603 ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2604 ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2605
2606 Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2607 ndbrequire(cnt);
2608
2609 if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
2610 {
2611 jam();
2612
2613 if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2614 {
2615 Uint32 tot= 0;
2616 Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2617 while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2618 {
2619 Uint32 state= ptr.p->m_state;
2620 Uint32 pages= ptr.p->m_online.m_outstanding;
2621 ndbrequire(pages);
2622 ptr.p->m_online.m_outstanding= 0;
2623 ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2624 tot += pages;
2625 cnt--;
2626
2627 if((state & Undofile::FS_MOVE_NEXT) && !files.prev(ptr))
2628 files.last(ptr);
2629 }
2630
2631 lg_ptr.p->m_outstanding_fs = cnt;
2632 lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
2633 lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2634 }
2635 client_unlock(number(), __LINE__);
2636 return;
2637 }
2638
2639 lg_ptr.p->m_outstanding_fs = cnt - 1;
2640
2641 Ptr<GlobalPage> page_ptr;
2642 m_shared_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding);
2643 ptr.p->m_online.m_outstanding= 0;
2644
2645 File_formats::Undofile::Undo_page* page =
2646 (File_formats::Undofile::Undo_page*)page_ptr.p;
2647
2648 Uint64 lsn = 0;
2649 lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2650 lsn += page->m_page_header.m_page_lsn_lo;
2651
2652 switch(ptr.p->m_state){
2653 case Undofile::FS_SORTING:
2654 jam();
2655 break;
2656 case Undofile::FS_SEARCHING:
2657 jam();
2658 find_log_head_in_file(signal, lg_ptr, ptr, lsn);
2659 client_unlock(number(), __LINE__);
2660 return;
2661 default:
2662 case Undofile::FS_EXECUTING:
2663 case Undofile::FS_CREATING:
2664 case Undofile::FS_DROPPING:
2665 case Undofile::FS_ONLINE:
2666 case Undofile::FS_OPENING:
2667 case Undofile::FS_EMPTY:
2668 jam();
2669 ndbrequire(false);
2670 }
2671
2672 /**
2673 * Prepare for execution
2674 */
2675 ptr.p->m_state = Undofile::FS_EXECUTING;
2676 ptr.p->m_online.m_lsn = lsn;
2677
2678 /**
2679 * Insert into m_files
2680 */
2681 {
2682 Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
2683 Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2684 meta.remove(ptr);
2685
2686 Ptr<Undofile> loop;
2687 files.first(loop);
2688 while(!loop.isNull() && loop.p->m_online.m_lsn <= lsn)
2689 files.next(loop);
2690
2691 if(loop.isNull())
2692 {
2693 /**
2694 * File has highest lsn, add last
2695 */
2696 jam();
2697 files.add(ptr);
2698 }
2699 else
2700 {
2701 /**
2702 * Insert file in correct position in file list
2703 */
2704 files.insert(ptr, loop);
2705 }
2706 }
2707 find_log_head(signal, lg_ptr);
2708 client_unlock(number(), __LINE__);
2709 }
2710
2711 void
execFSREADREF(Signal * signal)2712 Lgman::execFSREADREF(Signal* signal)
2713 {
2714 jamEntry();
2715 SimulatedBlock::execFSREADREF(signal);
2716 ndbrequire(false);
2717 }
2718
2719 void
find_log_head_in_file(Signal * signal,Ptr<Logfile_group> ptr,Ptr<Undofile> file_ptr,Uint64 last_lsn)2720 Lgman::find_log_head_in_file(Signal* signal,
2721 Ptr<Logfile_group> ptr,
2722 Ptr<Undofile> file_ptr,
2723 Uint64 last_lsn)
2724 {
2725 // a b
2726 // 3 4 5 0 1
2727 Uint32 curr= ptr.p->m_file_pos[HEAD].m_ptr_i;
2728 Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
2729 Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
2730
2731 ndbrequire(head > tail);
2732 Uint32 diff = head - tail;
2733
2734 if(DEBUG_SEARCH_LOG_HEAD)
2735 printf("tail: %d(%lld) head: %d last: %d(%lld) -> ",
2736 tail, file_ptr.p->m_online.m_lsn,
2737 head, curr, last_lsn);
2738 if(last_lsn > file_ptr.p->m_online.m_lsn)
2739 {
2740 if(DEBUG_SEARCH_LOG_HEAD)
2741 printf("moving tail ");
2742
2743 file_ptr.p->m_online.m_lsn = last_lsn;
2744 ptr.p->m_file_pos[TAIL].m_idx = tail = curr;
2745 }
2746 else
2747 {
2748 if(DEBUG_SEARCH_LOG_HEAD)
2749 printf("moving head ");
2750
2751 ptr.p->m_file_pos[HEAD].m_idx = head = curr;
2752 }
2753
2754 if(diff > 1)
2755 {
2756 // We need to find more pages to be sure...
2757 ptr.p->m_file_pos[HEAD].m_ptr_i = curr = ((head + tail) >> 1);
2758
2759 if(DEBUG_SEARCH_LOG_HEAD)
2760 ndbout_c("-> new search tail: %d(%lld) head: %d -> %d",
2761 tail, file_ptr.p->m_online.m_lsn,
2762 head, curr);
2763
2764 Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2765 file_ptr.p->m_online.m_outstanding= page_id;
2766
2767 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2768 req->filePointer = file_ptr.p->m_fd;
2769 req->userReference = reference();
2770 req->userPointer = file_ptr.i;
2771 req->varIndex = curr;
2772 req->numberOfPages = 1;
2773 req->data.pageData[0] = page_id;
2774 req->operationFlag = 0;
2775 FsReadWriteReq::setFormatFlag(req->operationFlag,
2776 FsReadWriteReq::fsFormatSharedPage);
2777
2778 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2779 FsReadWriteReq::FixedLength + 1, JBA);
2780
2781 ptr.p->m_outstanding_fs++;
2782 file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2783 return;
2784 }
2785
2786 ndbrequire(diff == 1);
2787 if(DEBUG_SEARCH_LOG_HEAD)
2788 ndbout_c("-> found last page: %d", tail);
2789
2790 ptr.p->m_state = 0;
2791 file_ptr.p->m_state = Undofile::FS_EXECUTING;
2792 ptr.p->m_last_lsn = file_ptr.p->m_online.m_lsn;
2793 ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_lsn;
2794 ptr.p->m_last_synced_lsn = file_ptr.p->m_online.m_lsn;
2795 m_last_lsn = file_ptr.p->m_online.m_lsn;
2796
2797 /**
2798 * Set HEAD position
2799 */
2800 ptr.p->m_file_pos[HEAD].m_ptr_i = file_ptr.i;
2801 ptr.p->m_file_pos[HEAD].m_idx = tail;
2802
2803 ptr.p->m_file_pos[TAIL].m_ptr_i = file_ptr.i;
2804 ptr.p->m_file_pos[TAIL].m_idx = tail - 1;
2805 ptr.p->m_next_reply_ptr_i = file_ptr.i;
2806
2807 {
2808 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2809 if(tail == 1)
2810 {
2811 /**
2812 * HEAD is first page in a file...
2813 * -> PREV should be in previous file
2814 */
2815 Ptr<Undofile> prev = file_ptr;
2816 if(!files.prev(prev))
2817 {
2818 files.last(prev);
2819 }
2820 ptr.p->m_file_pos[TAIL].m_ptr_i = prev.i;
2821 ptr.p->m_file_pos[TAIL].m_idx = prev.p->m_file_size - 1;
2822 ptr.p->m_next_reply_ptr_i = prev.i;
2823 }
2824
2825 SimulatedBlock* fs = globalData.getBlock(NDBFS);
2826 infoEvent("Undo head - %s page: %d lsn: %lld",
2827 fs->get_filename(file_ptr.p->m_fd),
2828 tail, file_ptr.p->m_online.m_lsn);
2829 g_eventLogger->info("Undo head - %s page: %d lsn: %lld",
2830 fs->get_filename(file_ptr.p->m_fd),
2831 tail, file_ptr.p->m_online.m_lsn);
2832
2833 for(files.prev(file_ptr); !file_ptr.isNull(); files.prev(file_ptr))
2834 {
2835 infoEvent(" - next - %s(%lld)",
2836 fs->get_filename(file_ptr.p->m_fd),
2837 file_ptr.p->m_online.m_lsn);
2838
2839 g_eventLogger->info(" - next - %s(%lld)",
2840 fs->get_filename(file_ptr.p->m_fd),
2841 file_ptr.p->m_online.m_lsn);
2842 }
2843 }
2844
2845 /**
2846 * Start next logfile group
2847 */
2848 m_logfile_group_list.next(ptr);
2849 signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2850 signal->theData[1] = ptr.i;
2851 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2852 }
2853
2854 void
init_run_undo_log(Signal * signal)2855 Lgman::init_run_undo_log(Signal* signal)
2856 {
2857 /**
2858 * Perform initial sorting of logfile groups
2859 */
2860 Ptr<Logfile_group> group;
2861 Logfile_group_list& list= m_logfile_group_list;
2862 Logfile_group_list tmp(m_logfile_group_pool);
2863
2864 bool found_any = false;
2865
2866 list.first(group);
2867 while(!group.isNull())
2868 {
2869 Ptr<Logfile_group> ptr= group;
2870 list.next(group);
2871 list.remove(ptr);
2872
2873 if (ptr.p->m_state & Logfile_group::LG_ONLINE)
2874 {
2875 /**
2876 * No logfiles in group
2877 */
2878 jam();
2879 tmp.addLast(ptr);
2880 continue;
2881 }
2882
2883 found_any = true;
2884
2885 {
2886 /**
2887 * Init buffer pointers
2888 */
2889 ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS;
2890 ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read
2891 ptr.p->m_pos[PRODUCER].m_current_page.m_idx = 0; // 0 more pages read
2892
2893 Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2894 File_formats::Undofile::Undo_page* pageP =
2895 (File_formats::Undofile::Undo_page*)m_shared_page_pool.getPtr(page);
2896
2897 ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used;
2898 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 1;
2899 ptr.p->m_last_read_lsn++;
2900 }
2901
2902 /**
2903 * Start producer thread
2904 */
2905 signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2906 signal->theData[1] = ptr.i;
2907 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2908
2909 /**
2910 * Insert in correct postion in list of logfile_group's
2911 */
2912 Ptr<Logfile_group> pos;
2913 for(tmp.first(pos); !pos.isNull(); tmp.next(pos))
2914 if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn)
2915 break;
2916
2917 if(pos.isNull())
2918 tmp.add(ptr);
2919 else
2920 tmp.insert(ptr, pos);
2921
2922 ptr.p->m_state =
2923 Logfile_group::LG_EXEC_THREAD | Logfile_group::LG_READ_THREAD;
2924 }
2925 list = tmp;
2926
2927 if (found_any == false)
2928 {
2929 /**
2930 * No logfilegroup had any logfiles
2931 */
2932 jam();
2933 signal->theData[0] = reference();
2934 sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2935 return;
2936 }
2937
2938 execute_undo_record(signal);
2939 }
2940
2941 void
read_undo_log(Signal * signal,Ptr<Logfile_group> ptr)2942 Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
2943 {
2944 Uint32 cnt, free= ptr.p->m_free_buffer_words;
2945
2946 if(! (ptr.p->m_state & Logfile_group::LG_EXEC_THREAD))
2947 {
2948 jam();
2949 /**
2950 * Logfile_group is done...
2951 */
2952 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_READ_THREAD;
2953 stop_run_undo_log(signal);
2954 return;
2955 }
2956
2957 if(free <= File_formats::UNDO_PAGE_WORDS)
2958 {
2959 signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2960 signal->theData[1] = ptr.i;
2961 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2962 return;
2963 }
2964
2965 Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2966 Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2967
2968 if(producer.m_current_page.m_idx == 0)
2969 {
2970 /**
2971 * zero pages left in range -> switch range
2972 */
2973 Lgman::Page_map::Iterator it;
2974 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2975 Uint32 sz = map.getSize();
2976 Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz;
2977 map.position(it, pos);
2978 union {
2979 Uint32 _tmp[2];
2980 Lgman::Buffer_idx range;
2981 };
2982 _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2983 producer.m_current_page.m_ptr_i = pos;
2984 producer.m_current_page.m_idx = range.m_idx;
2985 producer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx;
2986 }
2987
2988 if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i &&
2989 producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i)
2990 {
2991 Uint32 max=
2992 producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
2993 ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2994 cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2995 ndbrequire(cnt <= max);
2996 producer.m_current_pos.m_ptr_i -= cnt;
2997 producer.m_current_page.m_idx -= cnt;
2998 }
2999 else
3000 {
3001 Uint32 max= producer.m_current_page.m_idx;
3002 ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
3003 cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
3004 ndbrequire(cnt <= max);
3005 producer.m_current_pos.m_ptr_i -= cnt;
3006 producer.m_current_page.m_idx -= cnt;
3007 }
3008
3009 ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
3010 free -= (cnt * File_formats::UNDO_PAGE_WORDS);
3011 ptr.p->m_free_buffer_words = free;
3012 ptr.p->m_pos[PRODUCER] = producer;
3013
3014 signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
3015 signal->theData[1] = ptr.i;
3016
3017 if(free > File_formats::UNDO_PAGE_WORDS)
3018 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
3019 else
3020 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
3021 }
3022
3023 Uint32
read_undo_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 pages)3024 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
3025 Uint32 pageId, Uint32 pages)
3026 {
3027 ndbrequire(pages);
3028 Ptr<Undofile> filePtr;
3029 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
3030 m_file_pool.getPtr(filePtr, tail.m_ptr_i);
3031
3032 if(filePtr.p->m_online.m_outstanding > 0)
3033 {
3034 jam();
3035 return 0;
3036 }
3037
3038 Uint32 max= tail.m_idx;
3039
3040 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
3041 req->filePointer = filePtr.p->m_fd;
3042 req->userReference = reference();
3043 req->userPointer = filePtr.i;
3044 req->operationFlag = 0;
3045 FsReadWriteReq::setFormatFlag(req->operationFlag,
3046 FsReadWriteReq::fsFormatSharedPage);
3047
3048
3049 if(max > pages)
3050 {
3051 jam();
3052 tail.m_idx -= pages;
3053
3054 req->varIndex = 1 + tail.m_idx;
3055 req->numberOfPages = pages;
3056 req->data.pageData[0] = pageId - pages;
3057 ptr.p->m_file_pos[TAIL] = tail;
3058
3059 if(DEBUG_UNDO_EXECUTION)
3060 ndbout_c("a reading from file: %d page(%d-%d) into (%d-%d)",
3061 ptr.i, 1 + tail.m_idx, 1+tail.m_idx+pages-1,
3062 pageId - pages, pageId - 1);
3063
3064 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
3065 FsReadWriteReq::FixedLength + 1, JBA);
3066
3067 ptr.p->m_outstanding_fs++;
3068 filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
3069 filePtr.p->m_online.m_outstanding = pages;
3070 max = pages;
3071 }
3072 else
3073 {
3074 jam();
3075
3076 ndbrequire(tail.m_idx - max == 0);
3077 req->varIndex = 1;
3078 req->numberOfPages = max;
3079 req->data.pageData[0] = pageId - max;
3080
3081 if(DEBUG_UNDO_EXECUTION)
3082 ndbout_c("b reading from file: %d page(%d-%d) into (%d-%d)",
3083 ptr.i, 1 , 1+max-1,
3084 pageId - max, pageId - 1);
3085
3086 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
3087 FsReadWriteReq::FixedLength + 1, JBA);
3088
3089 ptr.p->m_outstanding_fs++;
3090 filePtr.p->m_online.m_outstanding = max;
3091 filePtr.p->m_state |= Undofile::FS_OUTSTANDING | Undofile::FS_MOVE_NEXT;
3092
3093 Ptr<Undofile> prev = filePtr;
3094 {
3095 Local_undofile_list files(m_file_pool, ptr.p->m_files);
3096 if(!files.prev(prev))
3097 {
3098 jam();
3099 files.last(prev);
3100 }
3101 }
3102 if(DEBUG_UNDO_EXECUTION)
3103 ndbout_c("changing file from %d to %d", filePtr.i, prev.i);
3104
3105 tail.m_idx= prev.p->m_file_size - 1;
3106 tail.m_ptr_i= prev.i;
3107 ptr.p->m_file_pos[TAIL] = tail;
3108 if(max < pages && filePtr.i != prev.i)
3109 max += read_undo_pages(signal, ptr, pageId - max, pages - max);
3110 }
3111
3112 return max;
3113
3114 }
3115
3116 void
execute_undo_record(Signal * signal)3117 Lgman::execute_undo_record(Signal* signal)
3118 {
3119 Uint64 lsn;
3120 const Uint32* ptr;
3121 if((ptr = get_next_undo_record(&lsn)))
3122 {
3123 Uint32 len= (* ptr) & 0xFFFF;
3124 Uint32 type= (* ptr) >> 16;
3125 Uint32 mask= type & ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN;
3126 switch(mask){
3127 case File_formats::Undofile::UNDO_END:
3128 stop_run_undo_log(signal);
3129 return;
3130 case File_formats::Undofile::UNDO_LCP:
3131 case File_formats::Undofile::UNDO_LCP_FIRST:
3132 {
3133 Uint32 lcp = * (ptr - len + 1);
3134 if(m_latest_lcp && lcp > m_latest_lcp)
3135 {
3136 if (0)
3137 {
3138 const Uint32 * base = ptr - len + 1;
3139 Uint32 lcp = base[0];
3140 Uint32 tableId = base[1] >> 16;
3141 Uint32 fragId = base[1] & 0xFFFF;
3142
3143 ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
3144 lcp, tableId, fragId);
3145 }
3146 }
3147
3148 if(m_latest_lcp == 0 ||
3149 lcp < m_latest_lcp ||
3150 (lcp == m_latest_lcp &&
3151 mask == File_formats::Undofile::UNDO_LCP_FIRST))
3152 {
3153 stop_run_undo_log(signal);
3154 return;
3155 }
3156 // Fallthrough
3157 }
3158 case File_formats::Undofile::UNDO_TUP_ALLOC:
3159 case File_formats::Undofile::UNDO_TUP_UPDATE:
3160 case File_formats::Undofile::UNDO_TUP_FREE:
3161 case File_formats::Undofile::UNDO_TUP_CREATE:
3162 case File_formats::Undofile::UNDO_TUP_DROP:
3163 case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
3164 case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
3165 {
3166 Dbtup_client tup(this, m_tup);
3167 tup.disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
3168 jamEntry();
3169 }
3170 return;
3171 default:
3172 ndbrequire(false);
3173 }
3174 }
3175 signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
3176 sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
3177
3178 return;
3179 }
3180
3181 const Uint32*
get_next_undo_record(Uint64 * this_lsn)3182 Lgman::get_next_undo_record(Uint64 * this_lsn)
3183 {
3184 Ptr<Logfile_group> ptr;
3185 m_logfile_group_list.first(ptr);
3186
3187 Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
3188 Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
3189 if(producer.m_current_pos.m_idx < 2)
3190 {
3191 jam();
3192 /**
3193 * Wait for fetching pages...
3194 */
3195 return 0;
3196 }
3197
3198 Uint32 pos = consumer.m_current_pos.m_idx;
3199 Uint32 page = consumer.m_current_pos.m_ptr_i;
3200
3201 File_formats::Undofile::Undo_page* pageP=(File_formats::Undofile::Undo_page*)
3202 m_shared_page_pool.getPtr(page);
3203
3204 if(pos == 0)
3205 {
3206 /**
3207 * End of log
3208 */
3209 pageP->m_data[0] = (File_formats::Undofile::UNDO_END << 16) | 1 ;
3210 pageP->m_page_header.m_page_lsn_hi = 0;
3211 pageP->m_page_header.m_page_lsn_lo = 0;
3212 pos= consumer.m_current_pos.m_idx= pageP->m_words_used = 1;
3213 this_lsn = 0;
3214 return pageP->m_data;
3215 }
3216
3217 Uint32 *record= pageP->m_data + pos - 1;
3218 Uint32 len= (* record) & 0xFFFF;
3219 ndbrequire(len);
3220 Uint32 *prev= record - len;
3221 Uint64 lsn = 0;
3222
3223 // Same page
3224 if(((* record) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
3225 {
3226 lsn = ptr.p->m_last_read_lsn - 1;
3227 ndbrequire((Int64)lsn >= 0);
3228 }
3229 else
3230 {
3231 ndbrequire(pos >= 3);
3232 lsn += * (prev - 1); lsn <<= 32;
3233 lsn += * (prev - 0);
3234 len += 2;
3235 ndbrequire((Int64)lsn >= 0);
3236 }
3237
3238
3239 ndbrequire(pos >= len);
3240
3241 if(pos == len)
3242 {
3243 /**
3244 * Switching page
3245 */
3246 ndbrequire(producer.m_current_pos.m_idx);
3247 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
3248
3249 if(consumer.m_current_page.m_idx)
3250 {
3251 consumer.m_current_page.m_idx--; // left in range
3252 consumer.m_current_pos.m_ptr_i --; // page
3253 }
3254 else
3255 {
3256 // 0 pages left in range...switch range
3257 Lgman::Page_map::Iterator it;
3258 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
3259 Uint32 sz = map.getSize();
3260 Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz;
3261
3262 map.position(it, tmp);
3263 union {
3264 Uint32 _tmp[2];
3265 Lgman::Buffer_idx range;
3266 };
3267
3268 _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
3269
3270 consumer.m_current_page.m_idx = range.m_idx - 1; // left in range
3271 consumer.m_current_page.m_ptr_i = tmp; // pos in map
3272
3273 consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page
3274 }
3275
3276 if(DEBUG_UNDO_EXECUTION)
3277 ndbout_c("reading from %d", consumer.m_current_pos.m_ptr_i);
3278
3279 pageP=(File_formats::Undofile::Undo_page*)
3280 m_shared_page_pool.getPtr(consumer.m_current_pos.m_ptr_i);
3281
3282 pos= consumer.m_current_pos.m_idx= pageP->m_words_used;
3283
3284 Uint64 tmp = 0;
3285 tmp += pageP->m_page_header.m_page_lsn_hi; tmp <<= 32;
3286 tmp += pageP->m_page_header.m_page_lsn_lo;
3287
3288 prev = pageP->m_data + pos - 1;
3289
3290 if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
3291 {
3292 ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
3293 }
3294
3295 ptr.p->m_pos[CONSUMER] = consumer;
3296 ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
3297 }
3298 else
3299 {
3300 ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len;
3301 }
3302
3303 * this_lsn = ptr.p->m_last_read_lsn = lsn;
3304
3305 /**
3306 * Re-sort log file groups
3307 */
3308 Ptr<Logfile_group> sort = ptr;
3309 if(m_logfile_group_list.next(sort))
3310 {
3311 while(!sort.isNull() && sort.p->m_last_read_lsn > lsn)
3312 m_logfile_group_list.next(sort);
3313
3314 if(sort.i != ptr.p->nextList)
3315 {
3316 m_logfile_group_list.remove(ptr);
3317 if(sort.isNull())
3318 m_logfile_group_list.add(ptr);
3319 else
3320 m_logfile_group_list.insert(ptr, sort);
3321 }
3322 }
3323 return record;
3324 }
3325
3326 void
stop_run_undo_log(Signal * signal)3327 Lgman::stop_run_undo_log(Signal* signal)
3328 {
3329 bool running = false, outstanding = false;
3330 Ptr<Logfile_group> ptr;
3331 m_logfile_group_list.first(ptr);
3332 while(!ptr.isNull())
3333 {
3334 /**
3335 * Mark exec thread as completed
3336 */
3337 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_EXEC_THREAD;
3338
3339 if(ptr.p->m_state & Logfile_group::LG_READ_THREAD)
3340 {
3341 /**
3342 * Thread is still running...wait for it to complete
3343 */
3344 running = true;
3345 }
3346 else if(ptr.p->m_outstanding_fs)
3347 {
3348 outstanding = true; // a FSREADREQ is outstanding...wait for it
3349 }
3350 else if(ptr.p->m_state != Logfile_group::LG_ONLINE)
3351 {
3352 /**
3353 * Fix log TAIL
3354 */
3355 ndbrequire(ptr.p->m_state == 0);
3356 ptr.p->m_state = Logfile_group::LG_ONLINE;
3357 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
3358 Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
3359
3360 while(pages)
3361 {
3362 Ptr<Undofile> file;
3363 m_file_pool.getPtr(file, tail.m_ptr_i);
3364 Uint32 page= tail.m_idx;
3365 Uint32 size= file.p->m_file_size;
3366 ndbrequire(size >= page);
3367 Uint32 diff= size - page;
3368
3369 if(pages >= diff)
3370 {
3371 pages -= diff;
3372 Local_undofile_list files(m_file_pool, ptr.p->m_files);
3373 if(!files.next(file))
3374 files.first(file);
3375 tail.m_idx = 1;
3376 tail.m_ptr_i= file.i;
3377 }
3378 else
3379 {
3380 tail.m_idx += pages;
3381 pages= 0;
3382 }
3383 }
3384 ptr.p->m_tail_pos[0] = tail;
3385 ptr.p->m_tail_pos[1] = tail;
3386 ptr.p->m_tail_pos[2] = tail;
3387 ptr.p->m_file_pos[TAIL] = tail;
3388
3389 init_logbuffer_pointers(ptr);
3390
3391 {
3392 Buffer_idx head= ptr.p->m_file_pos[HEAD];
3393 Ptr<Undofile> file;
3394 m_file_pool.getPtr(file, head.m_ptr_i);
3395 if (head.m_idx == file.p->m_file_size - 1)
3396 {
3397 Local_undofile_list files(m_file_pool, ptr.p->m_files);
3398 if(!files.next(file))
3399 {
3400 jam();
3401 files.first(file);
3402 }
3403 head.m_idx = 0;
3404 head.m_ptr_i = file.i;
3405 ptr.p->m_file_pos[HEAD] = head;
3406 }
3407 }
3408
3409 client_lock(number(), __LINE__);
3410 ptr.p->m_free_file_words = (Uint64)File_formats::UNDO_PAGE_WORDS *
3411 (Uint64)compute_free_file_pages(ptr);
3412 client_unlock(number(), __LINE__);
3413 ptr.p->m_next_reply_ptr_i = ptr.p->m_file_pos[HEAD].m_ptr_i;
3414
3415 ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
3416 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
3417 signal->theData[1] = ptr.i;
3418 signal->theData[2] = 0;
3419 sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
3420
3421 if(1)
3422 {
3423 SimulatedBlock* fs = globalData.getBlock(NDBFS);
3424 Ptr<Undofile> hf, tf;
3425 m_file_pool.getPtr(tf, tail.m_ptr_i);
3426 m_file_pool.getPtr(hf, ptr.p->m_file_pos[HEAD].m_ptr_i);
3427 infoEvent("Logfile group: %d ", ptr.p->m_logfile_group_id);
3428 g_eventLogger->info("Logfile group: %d ", ptr.p->m_logfile_group_id);
3429 infoEvent(" head: %s page: %d",
3430 fs->get_filename(hf.p->m_fd),
3431 ptr.p->m_file_pos[HEAD].m_idx);
3432 g_eventLogger->info(" head: %s page: %d",
3433 fs->get_filename(hf.p->m_fd),
3434 ptr.p->m_file_pos[HEAD].m_idx);
3435 infoEvent(" tail: %s page: %d",
3436 fs->get_filename(tf.p->m_fd), tail.m_idx);
3437 g_eventLogger->info(" tail: %s page: %d",
3438 fs->get_filename(tf.p->m_fd), tail.m_idx);
3439 }
3440 }
3441
3442 m_logfile_group_list.next(ptr);
3443 }
3444
3445 if(running)
3446 {
3447 jam();
3448 return;
3449 }
3450
3451 if(outstanding)
3452 {
3453 jam();
3454 signal->theData[0] = LgmanContinueB::STOP_UNDO_LOG;
3455 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 1);
3456 return;
3457 }
3458
3459 infoEvent("Flushing page cache after undo completion");
3460 g_eventLogger->info("Flushing page cache after undo completion");
3461
3462 /**
3463 * Start flushing pages (local, LCP)
3464 */
3465 LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
3466 ord->lcpId = m_latest_lcp;
3467 sendSignal(PGMAN_REF, GSN_LCP_FRAG_ORD, signal,
3468 LcpFragOrd::SignalLength, JBB);
3469
3470 EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
3471 req->senderData = 0;
3472 req->senderRef = reference();
3473 req->backupId = m_latest_lcp;
3474 sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
3475 EndLcpReq::SignalLength, JBB);
3476 }
3477
3478 void
execEND_LCP_CONF(Signal * signal)3479 Lgman::execEND_LCP_CONF(Signal* signal)
3480 {
3481 {
3482 Dbtup_client tup(this, m_tup);
3483 tup.disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
3484 jamEntry();
3485 }
3486
3487 /**
3488 * pgman has completed flushing all pages
3489 *
3490 * insert "fake" LCP record preventing undo to be "rerun"
3491 */
3492 Uint32 undo[3];
3493 undo[0] = m_latest_lcp;
3494 undo[1] = (0 << 16) | 0;
3495 undo[2] = (File_formats::Undofile::UNDO_LCP_FIRST << 16 )
3496 | (sizeof(undo) >> 2);
3497
3498 Ptr<Logfile_group> ptr;
3499 ndbrequire(m_logfile_group_list.first(ptr));
3500
3501 Uint64 last_lsn= m_last_lsn;
3502 if(ptr.p->m_last_lsn == last_lsn
3503 #ifdef VM_TRACE
3504 && ((rand() % 100) > 50)
3505 #endif
3506 )
3507 {
3508 undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
3509 Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
3510 memcpy(dst, undo, sizeof(undo));
3511 ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
3512 ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
3513 }
3514 else
3515 {
3516 Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
3517 * dst++ = (Uint32)(last_lsn >> 32);
3518 * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
3519 memcpy(dst, undo, sizeof(undo));
3520 ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
3521 ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
3522 }
3523 m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
3524
3525 ptr.p->m_last_synced_lsn = last_lsn;
3526 while(m_logfile_group_list.next(ptr))
3527 ptr.p->m_last_synced_lsn = last_lsn;
3528
3529 infoEvent("Flushing complete");
3530 g_eventLogger->info("Flushing complete");
3531
3532 signal->theData[0] = reference();
3533 sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
3534 }
3535
3536 #ifdef VM_TRACE
3537 void
validate_logfile_group(Ptr<Logfile_group> ptr,const char * heading)3538 Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
3539 {
3540 do
3541 {
3542 if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL)
3543 break;
3544
3545 Uint32 pages = compute_free_file_pages(ptr);
3546
3547 Uint32 group_pages =
3548 ((ptr.p->m_free_file_words + File_formats::UNDO_PAGE_WORDS - 1)/ File_formats::UNDO_PAGE_WORDS) ;
3549 Uint32 last = ptr.p->m_free_file_words % File_formats::UNDO_PAGE_WORDS;
3550
3551 if(! (pages >= group_pages))
3552 {
3553 ndbout << heading << " Tail: " << ptr.p->m_file_pos[TAIL]
3554 << " Head: " << ptr.p->m_file_pos[HEAD]
3555 << " free: " << group_pages << "(" << last << ")"
3556 << " found: " << pages;
3557 for(Uint32 i = 0; i<3; i++)
3558 {
3559 ndbout << " - " << ptr.p->m_tail_pos[i];
3560 }
3561 ndbout << endl;
3562
3563 ndbrequire(pages >= group_pages);
3564 }
3565 } while(0);
3566 }
3567 #endif
3568
execGET_TABINFOREQ(Signal * signal)3569 void Lgman::execGET_TABINFOREQ(Signal* signal)
3570 {
3571 jamEntry();
3572
3573 if(!assembleFragments(signal))
3574 {
3575 return;
3576 }
3577
3578 GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0];
3579
3580 const Uint32 reqType = req->requestType & (~GetTabInfoReq::LongSignalConf);
3581 BlockReference retRef= req->senderRef;
3582 Uint32 senderData= req->senderData;
3583 Uint32 tableId= req->tableId;
3584
3585 if(reqType == GetTabInfoReq::RequestByName)
3586 {
3587 jam();
3588 SectionHandle handle(this, signal);
3589 releaseSections(handle);
3590
3591 sendGET_TABINFOREF(signal, req, GetTabInfoRef::NoFetchByName);
3592 return;
3593 }
3594
3595 Logfile_group key;
3596 key.m_logfile_group_id= tableId;
3597 Ptr<Logfile_group> ptr;
3598 m_logfile_group_hash.find(ptr, key);
3599
3600 if(ptr.p->m_logfile_group_id != tableId)
3601 {
3602 jam();
3603
3604 sendGET_TABINFOREF(signal, req, GetTabInfoRef::InvalidTableId);
3605 return;
3606 }
3607
3608
3609 GetTabInfoConf *conf = (GetTabInfoConf *)&signal->theData[0];
3610
3611 conf->senderData= senderData;
3612 conf->tableId= tableId;
3613 conf->freeWordsHi= (Uint32)(ptr.p->m_free_file_words >> 32);
3614 conf->freeWordsLo= (Uint32)(ptr.p->m_free_file_words & 0xFFFFFFFF);
3615 conf->tableType= DictTabInfo::LogfileGroup;
3616 conf->senderRef= reference();
3617 sendSignal(retRef, GSN_GET_TABINFO_CONF, signal,
3618 GetTabInfoConf::SignalLength, JBB);
3619 }
3620
sendGET_TABINFOREF(Signal * signal,GetTabInfoReq * req,GetTabInfoRef::ErrorCode errorCode)3621 void Lgman::sendGET_TABINFOREF(Signal* signal,
3622 GetTabInfoReq * req,
3623 GetTabInfoRef::ErrorCode errorCode)
3624 {
3625 jamEntry();
3626 GetTabInfoRef * const ref = (GetTabInfoRef *)&signal->theData[0];
3627 /**
3628 * The format of GetTabInfo Req/Ref is the same
3629 */
3630 BlockReference retRef = req->senderRef;
3631 ref->errorCode = errorCode;
3632
3633 sendSignal(retRef, GSN_GET_TABINFOREF, signal, signal->length(), JBB);
3634 }
3635