1 /* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 // can be removed if DBTUP continueB codes are moved to signaldata
24 #define DBTUP_C
25 
26 #include "DbtupProxy.hpp"
27 #include "Dbtup.hpp"
28 #include <pgman.hpp>
29 #include <signaldata/LgmanContinueB.hpp>
30 
31 #include <EventLogger.hpp>
32 
33 #define JAM_FILE_ID 413
34 
35 extern EventLogger * g_eventLogger;
36 
37 #ifdef VM_TRACE
38 //#define DEBUG_TUP_RESTART_ 1
39 #endif
40 
41 #ifdef DEBUG_TUP_RESTART
42 #define DEB_TUP_RESTART(arglist) do { g_eventLogger->info arglist ; } while (0)
43 #else
44 #define DEB_TUP_RESTART(arglist) do { } while (0)
45 #endif
46 
DbtupProxy(Block_context & ctx)47 DbtupProxy::DbtupProxy(Block_context& ctx) :
48   LocalProxy(DBTUP, ctx),
49   c_pgman(0),
50   c_tableRecSize(0),
51   c_tableRec(0)
52 {
53   // GSN_CREATE_TAB_REQ
54   addRecSignal(GSN_CREATE_TAB_REQ, &DbtupProxy::execCREATE_TAB_REQ);
55   addRecSignal(GSN_DROP_TAB_REQ, &DbtupProxy::execDROP_TAB_REQ);
56 
57   // GSN_BUILD_INDX_IMPL_REQ
58   addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &DbtupProxy::execBUILD_INDX_IMPL_REQ);
59   addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &DbtupProxy::execBUILD_INDX_IMPL_CONF);
60   addRecSignal(GSN_BUILD_INDX_IMPL_REF, &DbtupProxy::execBUILD_INDX_IMPL_REF);
61 }
62 
~DbtupProxy()63 DbtupProxy::~DbtupProxy()
64 {
65 }
66 
67 SimulatedBlock*
newWorker(Uint32 instanceNo)68 DbtupProxy::newWorker(Uint32 instanceNo)
69 {
70   return new Dbtup(m_ctx, instanceNo);
71 }
72 
73 // GSN_READ_CONFIG_REQ
74 void
callREAD_CONFIG_REQ(Signal * signal)75 DbtupProxy::callREAD_CONFIG_REQ(Signal* signal)
76 {
77   jam();
78   const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
79   ndbrequire(req->noOfParameters == 0);
80 
81   const ndb_mgm_configuration_iterator * p =
82     m_ctx.m_config.getOwnConfigIterator();
83   ndbrequire(p != 0);
84 
85   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_TABLE, &c_tableRecSize));
86   c_tableRec = (Uint32*)allocRecord("TableRec", sizeof(Uint32),
87                                     c_tableRecSize);
88   D("proxy:" << V(c_tableRecSize));
89   Uint32 i;
90   for (i = 0; i < c_tableRecSize; i++)
91     c_tableRec[i] = 0;
92   backREAD_CONFIG_REQ(signal);
93 }
94 
95 // GSN_STTOR
96 
97 void
callSTTOR(Signal * signal)98 DbtupProxy::callSTTOR(Signal* signal)
99 {
100   jam();
101   Uint32 startPhase = signal->theData[1];
102   switch (startPhase) {
103   case 1:
104     jam();
105     c_pgman = (Pgman*)globalData.getBlock(PGMAN);
106     c_tsman = (Tsman*)globalData.getBlock(TSMAN);
107     ndbrequire(c_pgman != 0);
108     ndbrequire(c_tsman != 0);
109     break;
110   }
111   backSTTOR(signal);
112 }
113 
114 // GSN_CREATE_TAB_REQ
115 
116 void
execCREATE_TAB_REQ(Signal * signal)117 DbtupProxy::execCREATE_TAB_REQ(Signal* signal)
118 {
119   jam();
120   const CreateTabReq* req = (const CreateTabReq*)signal->getDataPtr();
121   const Uint32 tableId = req->tableId;
122   const Uint32 create_table_schema_version = req->tableVersion & 0xFFFFFF;
123   ndbrequire(tableId < c_tableRecSize);
124   ndbrequire(create_table_schema_version != 0);
125   ndbrequire(c_tableRec[tableId] == 0);
126   c_tableRec[tableId] = create_table_schema_version;
127   DEB_TUP_RESTART(("Create table: %u", tableId));
128   D("proxy: created table" << V(tableId));
129 }
130 
131 void
execDROP_TAB_REQ(Signal * signal)132 DbtupProxy::execDROP_TAB_REQ(Signal* signal)
133 {
134   jam();
135   const DropTabReq* req = (const DropTabReq*)signal->getDataPtr();
136   const Uint32 tableId = req->tableId;
137   ndbrequire(tableId < c_tableRecSize);
138   c_tableRec[tableId] = 0;
139   DEB_TUP_RESTART(("Dropped table: %u", tableId));
140   D("proxy: dropped table" << V(tableId));
141 }
142 
143 // GSN_BUILD_INDX_IMPL_REQ
144 
145 void
execBUILD_INDX_IMPL_REQ(Signal * signal)146 DbtupProxy::execBUILD_INDX_IMPL_REQ(Signal* signal)
147 {
148   jam();
149   const BuildIndxImplReq* req = (const BuildIndxImplReq*)signal->getDataPtr();
150   Ss_BUILD_INDX_IMPL_REQ& ss = ssSeize<Ss_BUILD_INDX_IMPL_REQ>();
151   ss.m_req = *req;
152   ndbrequire(signal->getLength() == BuildIndxImplReq::SignalLength);
153   sendREQ(signal, ss);
154 }
155 
156 void
sendBUILD_INDX_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)157 DbtupProxy::sendBUILD_INDX_IMPL_REQ(Signal* signal, Uint32 ssId,
158                                     SectionHandle* handle)
159 {
160   jam();
161   Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
162 
163   BuildIndxImplReq* req = (BuildIndxImplReq*)signal->getDataPtrSend();
164   *req = ss.m_req;
165   req->senderRef = reference();
166   req->senderData = ssId;
167   sendSignalNoRelease(workerRef(ss.m_worker), GSN_BUILD_INDX_IMPL_REQ,
168                       signal, BuildIndxImplReq::SignalLength, JBB, handle);
169 }
170 
171 void
execBUILD_INDX_IMPL_CONF(Signal * signal)172 DbtupProxy::execBUILD_INDX_IMPL_CONF(Signal* signal)
173 {
174   jam();
175   const BuildIndxImplConf* conf = (const BuildIndxImplConf*)signal->getDataPtr();
176   Uint32 ssId = conf->senderData;
177   Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
178   recvCONF(signal, ss);
179 }
180 
181 void
execBUILD_INDX_IMPL_REF(Signal * signal)182 DbtupProxy::execBUILD_INDX_IMPL_REF(Signal* signal)
183 {
184   jam();
185   const BuildIndxImplRef* ref = (const BuildIndxImplRef*)signal->getDataPtr();
186   Uint32 ssId = ref->senderData;
187   Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
188   recvREF(signal, ss, ref->errorCode);
189 }
190 
191 void
sendBUILD_INDX_IMPL_CONF(Signal * signal,Uint32 ssId)192 DbtupProxy::sendBUILD_INDX_IMPL_CONF(Signal* signal, Uint32 ssId)
193 {
194   jam();
195   Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
196   BlockReference dictRef = ss.m_req.senderRef;
197 
198   if (!lastReply(ss))
199   {
200     jam();
201     return;
202   }
203 
204   if (ss.m_error == 0)
205   {
206     jam();
207     BuildIndxImplConf* conf = (BuildIndxImplConf*)signal->getDataPtrSend();
208     conf->senderRef = reference();
209     conf->senderData = ss.m_req.senderData;
210     sendSignal(dictRef, GSN_BUILD_INDX_IMPL_CONF,
211                signal, BuildIndxImplConf::SignalLength, JBB);
212   }
213   else
214   {
215     jam();
216     BuildIndxImplRef* ref = (BuildIndxImplRef*)signal->getDataPtrSend();
217     ref->senderRef = reference();
218     ref->senderData = ss.m_req.senderData;
219     ref->errorCode = ss.m_error;
220     sendSignal(dictRef, GSN_BUILD_INDX_IMPL_REF,
221                signal, BuildIndxImplRef::SignalLength, JBB);
222   }
223 
224   ssRelease<Ss_BUILD_INDX_IMPL_REQ>(ssId);
225 }
226 
227 // client methods
228 
229 // LGMAN
230 
Proxy_undo()231 DbtupProxy::Proxy_undo::Proxy_undo()
232 {
233   m_type = 0;
234   m_len = 0;
235   m_ptr = 0;
236   m_lsn = (Uint64)0;
237   m_key.setNull();
238   m_page_id = ~(Uint32)0;
239   m_table_id = ~(Uint32)0;
240   m_fragment_id = ~(Uint32)0;
241   m_instance_no = ~(Uint32)0;
242   m_actions = 0;
243   m_in_use = false;
244 }
245 
246 void
disk_restart_undo(Signal * signal,Uint64 lsn,Uint32 type,const Uint32 * ptr,Uint32 len)247 DbtupProxy::disk_restart_undo(Signal* signal, Uint64 lsn,
248                               Uint32 type, const Uint32 * ptr, Uint32 len)
249 {
250   Proxy_undo& undo = c_proxy_undo;
251   ndbrequire(!undo.m_in_use);
252   new (&undo) Proxy_undo;
253   undo.m_in_use = true;
254 
255   D("proxy: disk_restart_undo" << V(type) << hex << V(ptr) << dec << V(len) << V(lsn));
256   undo.m_type = type;
257   undo.m_len = len;
258   undo.m_ptr = ptr; /* Ptr to memory managed by lgman */
259   undo.m_lsn = lsn;
260 
261   ndbrequire(undo.m_len <= MAX_UNDO_DATA);
262 
263   /**
264    * All the logic about when to stop executing the UNDO log
265    * is in lgman.cpp. So this code assumes that we haven't
266    * yet reached the end of the UNDO log execution.
267    */
268   switch (undo.m_type) {
269   case File_formats::Undofile::UNDO_LOCAL_LCP_FIRST:
270   case File_formats::Undofile::UNDO_LOCAL_LCP:
271   {
272     jam();
273     undo.m_table_id = ptr[2] >> 16;
274     undo.m_fragment_id = ptr[2] & 0xFFFF;
275     undo.m_actions |= Proxy_undo::SendToAll;
276     undo.m_actions |= Proxy_undo::SendUndoNext;
277     break;
278   }
279   case File_formats::Undofile::UNDO_LCP_FIRST:
280   case File_formats::Undofile::UNDO_LCP:
281   {
282     jam();
283     /**
284      * This is the start of the UNDO log, this is the synchronisation
285      * point, so we will UNDO information back to here. After this
286      * we don't need any more UNDO logging, we do still however need
287      * to use the UNDO logs to synchronize the extent bits with the
288      * page information.
289      */
290     undo.m_table_id = ptr[1] >> 16;
291     undo.m_fragment_id = ptr[1] & 0xFFFF;
292     undo.m_actions |= Proxy_undo::SendToAll;
293     undo.m_actions |= Proxy_undo::SendUndoNext;
294     break;
295   }
296   case File_formats::Undofile::UNDO_TUP_ALLOC:
297   {
298     jam();
299     const Dbtup::Disk_undo::Alloc* rec =
300       (const Dbtup::Disk_undo::Alloc*)ptr;
301     undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
302     undo.m_key.m_page_no = rec->m_page_no;
303     undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
304     undo.m_actions |= Proxy_undo::ReadTupPage;
305     undo.m_actions |= Proxy_undo::GetInstance;
306     /**
307      * Setting the SendUndoNext flag makes DbtupProxy send a CONTINUEB to
308      * LGMAN thread immediately after sending the current undo record to the
309      * respective LDM.
310      * This enables LGMAN to proceed to the next undo record immediately.
311      * If SendUndoNext is not set,  LGMAN waits for a CONTINUEB from LDM to
312      * confirm completion and then proceeds to fetch the next undo record.
313      */
314     undo.m_actions |= Proxy_undo::SendUndoNext; // Don't wait for LDM
315 
316     break;
317   }
318   case File_formats::Undofile::UNDO_TUP_UPDATE:
319   case File_formats::Undofile::UNDO_TUP_FIRST_UPDATE_PART:
320   {
321     jam();
322     const Dbtup::Disk_undo::Update* rec =
323       (const Dbtup::Disk_undo::Update*)ptr;
324     undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
325     undo.m_key.m_page_no = rec->m_page_no;
326     undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
327     undo.m_actions |= Proxy_undo::ReadTupPage;
328     undo.m_actions |= Proxy_undo::GetInstance;
329     undo.m_actions |= Proxy_undo::SendUndoNext; // Don't wait for LDM
330     break;
331   }
332   case File_formats::Undofile::UNDO_TUP_UPDATE_PART:
333   {
334     jam();
335     const Dbtup::Disk_undo::UpdatePart* rec =
336       (const Dbtup::Disk_undo::UpdatePart*)ptr;
337     undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
338     undo.m_key.m_page_no = rec->m_page_no;
339     undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
340     undo.m_actions |= Proxy_undo::ReadTupPage;
341     undo.m_actions |= Proxy_undo::GetInstance;
342     undo.m_actions |= Proxy_undo::SendUndoNext; // Don't wait for LDM
343     break;
344   }
345   case File_formats::Undofile::UNDO_TUP_FREE:
346   case File_formats::Undofile::UNDO_TUP_FREE_PART:
347   {
348     jam();
349     const Dbtup::Disk_undo::Free* rec =
350       (const Dbtup::Disk_undo::Free*)ptr;
351     undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
352     undo.m_key.m_page_no = rec->m_page_no;
353     undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
354     undo.m_actions |= Proxy_undo::ReadTupPage;
355     undo.m_actions |= Proxy_undo::GetInstance;
356     undo.m_actions |= Proxy_undo::SendUndoNext; // Don't wait for LDM
357     break;
358   }
359   case File_formats::Undofile::UNDO_TUP_DROP:
360   {
361     jam();
362     /**
363      * A table was dropped during UNDO log writing. This means that the
364      * table is no longer present, if no LCP record or CREATE record have
365      * occurred before this record then this is also a synch point. This
366      * synch point also says that the table is empty, but here the table
367      * as such should not be remaining either.
368      */
369     undo.m_actions |= Proxy_undo::SendToAll;
370     undo.m_actions |= Proxy_undo::SendUndoNext;
371     break;
372   }
373   case File_formats::Undofile::UNDO_END:
374   {
375     jam();
376     undo.m_actions |= Proxy_undo::SendToAll;
377     break;
378   }
379   default:
380     ndbabort();
381   }
382 
383   if (undo.m_actions & Proxy_undo::ReadTupPage) {
384     jam();
385     Uint32 create_table_version;
386     Uint32 tableId;
387     Uint32 fragId;
388     {
389       /**
390        * A quick way to get the page id translated to table id
391        * and fragment id is to get it from the extent information
392        * stored in TSMAN. It creates a contention point around
393        * TSMAN that eventually will have to be resolved, but it
394        * should be much preferred to reading each page from disk
395        * to be able to get the fragment id and table id.
396        *
397        * In rare cases we will not find a translation, in this case
398        * we need to read the page to get the table id and fragment
399        * id. If the page also doesn't contain any valid table id
400        * and fragment id then it is safe to ignore the log record
401        * since there is no page written that needs UNDO on.
402        *
403        * We need the table id and fragment id to be able to map the
404        * page id to the correct LDM instance.
405        */
406       Tablespace_client tsman(signal, this, c_tsman,
407                               0, 0, 0, 0); //Ignored for this call
408       tsman.get_extent_info(undo.m_key);
409       tableId = tsman.get_table_id();
410       fragId = tsman.get_fragment_id();
411       create_table_version = tsman.get_create_table_version();
412     }
413 
414     if (tableId >= c_tableRecSize ||
415         c_tableRec[tableId] != create_table_version)
416     {
417       jam();
418       /*
419        * The timeslice via PGMAN/5 gives LGMAN a chance to overwrite the
420        * data pointed to by ptr.  So save it now and do not use ptr.
421        */
422       memcpy(undo.m_data, undo.m_ptr, undo.m_len << 2);
423       undo.m_ptr = undo.m_data;
424       /*
425        * An invalid table id or an incorrect table version means that
426        * the extent information was not to be trusted. We attempt to
427        * read the page and get information from there instead.
428        *
429        * Page request goes to the extra PGMAN worker (our thread).
430        * TUP worker reads same page again via another PGMAN worker.
431        * MT-LGMAN is planned, do not optimize (pass page) now
432        *
433        * We need to read page in order to get table id and fragment id.
434        * This is not part of the UNDO log information and this information
435        * is required such that we can map this to the correct LDM
436        * instance. We will not make page dirty, so it will be replaced
437        * as soon as we need a dirty page or we're out of pages in this
438        * PGMAN instance.
439        */
440       Page_cache_client pgman(this, c_pgman);
441       Page_cache_client::Request req;
442 
443       /**
444        * Ensure that we crash if we try to make a LCP of this page
445        * later, should never happen since we never do any LCP of
446        * pages connected to fragments in extra pgman worker.
447        * page.
448        */
449       req.m_table_id = RNIL;
450       req.m_fragment_id = 0;
451       req.m_page = undo.m_key;
452       req.m_callback.m_callbackData = 0;
453       req.m_callback.m_callbackFunction =
454         safe_cast(&DbtupProxy::disk_restart_undo_callback);
455       int flags = Page_cache_client::UNDO_GET_REQ;
456 
457       int ret = pgman.get_page(signal, req, flags);
458       ndbrequire(ret >= 0);
459       if (ret > 0) {
460         jam();
461         execute(signal, req.m_callback, (Uint32)ret);
462       }
463       return;
464     }
465     DEB_TUP_RESTART(("DBTUP(0) create_table_version:%u c_tableRec:%u",
466                               create_table_version,
467                               c_tableRec[tableId]));
468     undo.m_table_id = tableId;
469     undo.m_fragment_id = fragId;
470   }
471   disk_restart_undo_finish(signal);
472 }
473 
474 void
disk_restart_undo_callback(Signal * signal,Uint32,Uint32 page_id)475 DbtupProxy::disk_restart_undo_callback(Signal* signal, Uint32, Uint32 page_id)
476 {
477   Proxy_undo& undo = c_proxy_undo;
478   undo.m_page_id = page_id;
479 
480   Ptr<GlobalPage> gptr;
481   m_global_page_pool.getPtr(gptr, undo.m_page_id);
482 
483   ndbrequire(undo.m_actions & Proxy_undo::ReadTupPage);
484   {
485     jam();
486     const Tup_page* page = (const Tup_page*)gptr.p;
487     const File_formats::Page_header& header = page->m_page_header;
488     const Uint32 page_type = header.m_page_type;
489 
490     if (page_type == 0)
491     {
492       jam();
493       /**
494        * Page not written, no need to worry about UNDO log, nothing
495        * written that requires UNDO.
496        */
497       ndbrequire(header.m_page_lsn_hi == 0 && header.m_page_lsn_lo == 0);
498       undo.m_actions |= Proxy_undo::NoExecute;
499       undo.m_actions |= Proxy_undo::SendUndoNext;
500       D("proxy: page never written" << V(page_type));
501       disk_restart_undo_finish(signal);
502       return;
503     }
504     jam();
505     undo.m_table_id = page->m_table_id;
506     undo.m_fragment_id = page->m_fragment_id;
507     D("proxy: callback" << V(undo.m_table_id) <<
508                            V(undo.m_fragment_id) <<
509                            V(undo.m_create_table_version));
510     const Uint32 tableId = undo.m_table_id;
511     if (tableId >= c_tableRecSize ||
512         c_tableRec[tableId] != page->m_create_table_version)
513     {
514       jam();
515       /**
516        * Also the table had invalid data, it have belonged to another
517        * table and have never been written by this table instance. So
518        * we can safely proceed.
519        */
520       D("proxy: page not written by this table" << V(tableId));
521       undo.m_actions |= Proxy_undo::NoExecute;
522       undo.m_actions |= Proxy_undo::SendUndoNext;
523       disk_restart_undo_finish(signal);
524       return;
525     }
526     ndbrequire(page_type == File_formats::PT_Tup_fixsize_page ||
527                page_type == File_formats::PT_Tup_varsize_page);
528 
529     {
530       /**
531        * The page had a correct table id and fragment id. Since we
532        * came here the extent information haven't been written.
533        * Update the extent information now even if the UNDO log
534        * is not to be executed.
535        */
536       jam();
537       Tablespace_client tsman(signal,
538                               this,
539                               c_tsman,
540                               page->m_table_id,
541                               page->m_fragment_id,
542                               page->m_create_table_version,
543                               0); //Ignored for this call
544       tsman.write_extent_info(undo.m_key);
545     }
546     Uint64 page_lsn = (Uint64(header.m_page_lsn_hi) << 32)
547         + header.m_page_lsn_lo;
548     if (! (undo.m_lsn <= page_lsn))
549     {
550       /**
551        * Page lsn shows that this UNDO log record was never applied. So
552        * no need to execute UNDO log record.
553        */
554       jam();
555       undo.m_actions |= Proxy_undo::NoExecute;
556       undo.m_actions |= Proxy_undo::SendUndoNext;
557       disk_restart_undo_finish(signal);
558       return;
559     }
560     disk_restart_undo_finish(signal);
561   }
562 }
563 
564 void
disk_restart_undo_send_next(Signal * signal,Uint32 tup_instance)565 DbtupProxy::disk_restart_undo_send_next(Signal *signal, Uint32 tup_instance)
566 {
567   signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
568   signal->theData[1] = 0; /* Not applied flag */
569   signal->theData[2] = tup_instance;
570   sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 3, JBB);
571 }
572 
573 void
disk_restart_undo_finish(Signal * signal)574 DbtupProxy::disk_restart_undo_finish(Signal* signal)
575 {
576   Proxy_undo& undo = c_proxy_undo;
577   /**
578    * The instance number of the tup instance the undo record is sent to.
579    * 0 means the undo record is not sent to any LDM or it was sent to all.
580    */
581   Uint32 tup_instance = 0;
582 
583   if (undo.m_actions & Proxy_undo::GetInstance) {
584     jam();
585     Uint32 instanceKey = getInstanceKeyCanFail(undo.m_table_id,
586                                                undo.m_fragment_id);
587     if (instanceKey == RNIL)
588     {
589       jam();
590       /**
591        * Ignore this record since no table with this table id and
592        * fragment id is currently existing.
593        */
594       disk_restart_undo_send_next(signal, tup_instance);
595       goto finish;
596     }
597     Uint32 instanceNo = getInstanceFromKey(instanceKey);
598     tup_instance = undo.m_instance_no = instanceNo;
599   }
600 
601   if (undo.m_actions & Proxy_undo::NoExecute) {
602     jam();
603     tup_instance = 0;
604   }
605 
606   if (undo.m_actions & Proxy_undo::SendUndoNext) {
607     jam();
608     disk_restart_undo_send_next(signal, tup_instance);
609   }
610 
611   if (undo.m_actions & Proxy_undo::NoExecute) {
612     jam();
613     goto finish;
614   }
615 
616   if (!(undo.m_actions & Proxy_undo::SendToAll)) {
617     jam();
618     ndbrequire(undo.m_instance_no != 0);
619     Uint32 i = undo.m_instance_no - 1;
620     DEB_TUP_RESTART(("DBTUP(0) SENDING lsn:%llu type:%u len:%u LDM:%u",
621                             undo.m_lsn,
622                             undo.m_type,
623                             undo.m_len,
624                             undo.m_instance_no));
625 
626     disk_restart_undo_send(signal, i);
627     tup_instance = undo.m_instance_no;
628   } else {
629     jam();
630     Uint32 i;
631     for (i = 0; i < c_workers; i++) {
632       DEB_TUP_RESTART(("DBTUP(0) DbtupProxy SENDING lsn:%llu type:%u "
633           "len:%u LDM:%u",
634           undo.m_lsn,
635           undo.m_type,
636           undo.m_len,
637           i+1));
638       disk_restart_undo_send(signal, i);
639     }
640   }
641 
642 finish:
643   ndbrequire(undo.m_in_use);
644   undo.m_in_use = false;
645 }
646 
647 void
disk_restart_undo_send(Signal * signal,Uint32 i)648 DbtupProxy::disk_restart_undo_send(Signal* signal, Uint32 i)
649 {
650   /*
651    * Send undo entry via long signal because:
652    * 1) a method call would execute in another non-mutexed Pgman
653    * 2) MT-LGMAN is planned, do not optimize (pass pointer) now
654    */
655   Proxy_undo& undo = c_proxy_undo;
656 
657   LinearSectionPtr ptr[3];
658   ptr[0].p = (Uint32 *)undo.m_ptr;
659   ptr[0].sz = undo.m_len;
660 
661   signal->theData[0] = ZDISK_RESTART_UNDO;
662   signal->theData[1] = undo.m_type;
663   signal->theData[2] = undo.m_len;
664   signal->theData[3] = (Uint32)(undo.m_lsn >> 32);
665   signal->theData[4] = (Uint32)(undo.m_lsn & 0xFFFFFFFF);
666   sendSignal(workerRef(i), GSN_CONTINUEB, signal, 5, JBB, ptr, 1);
667 }
668 
669 // TSMAN
670 
671 int
disk_restart_alloc_extent(EmulatedJamBuffer * jamBuf,Uint32 tableId,Uint32 fragId,Uint32 create_table_version,const Local_key * key,Uint32 pages)672 DbtupProxy::disk_restart_alloc_extent(EmulatedJamBuffer *jamBuf,
673                                       Uint32 tableId,
674                                       Uint32 fragId,
675                                       Uint32 create_table_version,
676                                       const Local_key* key,
677                                       Uint32 pages)
678 {
679   if (tableId >= c_tableRecSize || c_tableRec[tableId] == 0)
680   {
681     thrjam(jamBuf);
682     D("proxy: table dropped" << V(tableId));
683     DEB_TUP_RESTART(("disk_restart_alloc_extent failed on tab(%u,%u):%u,"
684                      " tableId missing",
685                      tableId,
686                      fragId,
687                      create_table_version));
688     return -1;
689   }
690   if (c_tableRec[tableId] != create_table_version)
691   {
692     thrjam(jamBuf);
693     DEB_TUP_RESTART(("disk_restart_alloc_extent failed on tab(%u,%u):%u,"
694                      " expected create_table_version: %u",
695                      tableId,
696                      fragId,
697                      create_table_version,
698                      c_tableRec[tableId]));
699     return -1;
700   }
701 
702   // local call so mapping instance key to number is ok
703   thrjam(jamBuf);
704   thrjamLine(jamBuf, Uint16(tableId));
705   thrjamLine(jamBuf, Uint16(fragId));
706   Uint32 instanceKey = getInstanceKeyCanFail(tableId, fragId);
707   if (instanceKey == RNIL)
708   {
709     thrjam(jamBuf);
710     DEB_TUP_RESTART(("disk_restart_alloc_extent failed, instanceKey = RNIL"));
711     D("proxy: table either dropped, non-existent or fragment not existing"
712       << V(tableId));
713     return -1;
714   }
715   thrjam(jamBuf);
716   Uint32 instanceNo = getInstanceFromKey(instanceKey);
717 
718   Uint32 i = workerIndex(instanceNo);
719   Dbtup* dbtup = (Dbtup*)workerBlock(i);
720   return dbtup->disk_restart_alloc_extent(jamBuf,
721                                           tableId,
722                                           fragId,
723                                           create_table_version,
724                                           key,
725                                           pages);
726 }
727 
728 void
disk_restart_page_bits(Uint32 tableId,Uint32 fragId,Uint32 create_table_version,const Local_key * key,Uint32 bits)729 DbtupProxy::disk_restart_page_bits(Uint32 tableId,
730                                    Uint32 fragId,
731                                    Uint32 create_table_version,
732                                    const Local_key* key,
733                                    Uint32 bits)
734 {
735   ndbrequire(tableId < c_tableRecSize &&
736              c_tableRec[tableId] != 0 &&
737              (create_table_version == 0 ||
738               c_tableRec[tableId] == create_table_version));
739 
740   // local call so mapping instance key to number is ok
741   /**
742    * No need to use getInstanceKeyCanFail here since this call is
743    * preceded by a call to disk_restart_alloc_extent above where
744    * this is checked.
745    */
746   Uint32 instanceKey = getInstanceKey(tableId, fragId);
747   Uint32 instanceNo = getInstanceFromKey(instanceKey);
748 
749   Uint32 i = workerIndex(instanceNo);
750   Dbtup* dbtup = (Dbtup*)workerBlock(i);
751   dbtup->disk_restart_page_bits(jamBuffer(),
752                                 tableId,
753                                 fragId,
754                                 create_table_version,
755                                 key,
756                                 bits);
757 }
758 BLOCK_FUNCTIONS(DbtupProxy)
759