1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22 
23 #include "PgmanProxy.hpp"
24 #include "pgman.hpp"
25 #include <signaldata/DataFileOrd.hpp>
26 
27 #define JAM_FILE_ID 470
28 
29 
PgmanProxy(Block_context & ctx)30 PgmanProxy::PgmanProxy(Block_context& ctx) :
31   LocalProxy(PGMAN, ctx)
32 {
33   // GSN_LCP_FRAG_ORD
34   addRecSignal(GSN_LCP_FRAG_ORD, &PgmanProxy::execLCP_FRAG_ORD);
35 
36   // GSN_END_LCPREQ
37   addRecSignal(GSN_END_LCPREQ, &PgmanProxy::execEND_LCPREQ);
38   addRecSignal(GSN_END_LCPCONF, &PgmanProxy::execEND_LCPCONF);
39   addRecSignal(GSN_RELEASE_PAGES_CONF, &PgmanProxy::execRELEASE_PAGES_CONF);
40 }
41 
~PgmanProxy()42 PgmanProxy::~PgmanProxy()
43 {
44 }
45 
46 SimulatedBlock*
newWorker(Uint32 instanceNo)47 PgmanProxy::newWorker(Uint32 instanceNo)
48 {
49   return new Pgman(m_ctx, instanceNo);
50 }
51 
52 // GSN_LCP_FRAG_ORD
53 
54 void
execLCP_FRAG_ORD(Signal * signal)55 PgmanProxy::execLCP_FRAG_ORD(Signal* signal)
56 {
57   const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
58   Uint32 ssId = getSsId(req);
59   Ss_LCP_FRAG_ORD& ss = ssSeize<Ss_LCP_FRAG_ORD>(ssId);
60   ss.m_req = *req;
61   sendREQ(signal, ss);
62   ssRelease<Ss_LCP_FRAG_ORD>(ssId);
63 }
64 
65 void
sendLCP_FRAG_ORD(Signal * signal,Uint32 ssId,SectionHandle * handle)66 PgmanProxy::sendLCP_FRAG_ORD(Signal* signal, Uint32 ssId, SectionHandle* handle)
67 {
68   Ss_LCP_FRAG_ORD& ss = ssFind<Ss_LCP_FRAG_ORD>(ssId);
69   LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
70   *req = ss.m_req;
71   sendSignalNoRelease(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
72                       signal, LcpFragOrd::SignalLength, JBB, handle);
73 }
74 
75 // GSN_END_LCPREQ
76 
77 void
execEND_LCPREQ(Signal * signal)78 PgmanProxy::execEND_LCPREQ(Signal* signal)
79 {
80   const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
81   Uint32 ssId = getSsId(req);
82   Ss_END_LCPREQ& ss = ssSeize<Ss_END_LCPREQ>(ssId);
83   ss.m_req = *req;
84 
85   const Uint32 sb = refToBlock(ss.m_req.senderRef);
86   ndbrequire(sb == DBLQH || sb == LGMAN);
87 
88   if (sb == LGMAN) {
89     jam();
90     /*
91      * At end of UNDO execution.  Extra PGMAN worker was used to
92      * read up TUP pages.  Release these pages now.
93      */
94     ReleasePagesReq* req = (ReleasePagesReq*)signal->getDataPtrSend();
95     req->senderData = ssId;
96     req->senderRef = reference();
97     req->requestType = ReleasePagesReq::RT_RELEASE_UNLOCKED;
98     req->requestData = 0;
99     // Extra worker
100     sendSignal(workerRef(c_workers - 1), GSN_RELEASE_PAGES_REQ,
101                signal, ReleasePagesReq::SignalLength, JBB);
102     return;
103   }
104   /**
105    * Send to extra PGMAN *after* all other PGMAN has completed
106    */
107   sendREQ(signal, ss, /* skip last */ true);
108 }
109 
110 void
execRELEASE_PAGES_CONF(Signal * signal)111 PgmanProxy::execRELEASE_PAGES_CONF(Signal* signal)
112 {
113   const ReleasePagesConf* conf = (const ReleasePagesConf*)signal->getDataPtr();
114   Uint32 ssId = getSsId(conf);
115   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
116   sendREQ(signal, ss);
117 }
118 
119 void
sendEND_LCPREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)120 PgmanProxy::sendEND_LCPREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
121 {
122   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
123 
124   EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
125   *req = ss.m_req;
126   req->senderData = ssId;
127   req->senderRef = reference();
128   sendSignalNoRelease(workerRef(ss.m_worker), GSN_END_LCPREQ,
129                       signal, EndLcpReq::SignalLength, JBB, handle);
130 }
131 
132 void
execEND_LCPCONF(Signal * signal)133 PgmanProxy::execEND_LCPCONF(Signal* signal)
134 {
135   const EndLcpConf* conf = (EndLcpConf*)signal->getDataPtr();
136   Uint32 ssId = conf->senderData;
137   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
138   recvCONF(signal, ss);
139 }
140 
141 void
sendEND_LCPCONF(Signal * signal,Uint32 ssId)142 PgmanProxy::sendEND_LCPCONF(Signal* signal, Uint32 ssId)
143 {
144   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
145   BlockReference senderRef = ss.m_req.senderRef;
146 
147   if (!lastReply(ss)) {
148     jam();
149     return;
150   }
151 
152   if (!ss.m_extraLast)
153   {
154     jam();
155     ss.m_extraLast = true;
156     ss.m_worker = c_workers - 1; // send to last PGMAN
157     ss.m_workerMask.set(ss.m_worker);
158     SectionHandle handle(this);
159     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
160     return;
161   }
162 
163   if (ss.m_error == 0) {
164     jam();
165     EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
166     conf->senderData = ss.m_req.senderData;
167     conf->senderRef = reference();
168     sendSignal(senderRef, GSN_END_LCPCONF,
169                signal, EndLcpConf::SignalLength, JBB);
170   } else {
171     ndbrequire(false);
172   }
173 
174   ssRelease<Ss_END_LCPREQ>(ssId);
175 }
176 
177 // client methods
178 
179 /*
180  * Here caller must have instance 0.  The extra worker in our
181  * thread is used.  These are extent pages.
182  */
183 
184 int
get_page(Page_cache_client & caller,Signal * signal,Page_cache_client::Request & req,Uint32 flags)185 PgmanProxy::get_page(Page_cache_client& caller,
186                      Signal* signal,
187                      Page_cache_client::Request& req, Uint32 flags)
188 {
189   ndbrequire(blockToInstance(caller.m_block) == 0);
190   SimulatedBlock* block = globalData.getBlock(caller.m_block);
191   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
192   Page_cache_client pgman(block, worker);
193   int ret = pgman.get_page(signal, req, flags);
194   caller.m_ptr = pgman.m_ptr;
195   return ret;
196 }
197 
198 void
update_lsn(Page_cache_client & caller,Local_key key,Uint64 lsn)199 PgmanProxy::update_lsn(Page_cache_client& caller,
200                        Local_key key, Uint64 lsn)
201 {
202   ndbrequire(blockToInstance(caller.m_block) == 0);
203   SimulatedBlock* block = globalData.getBlock(caller.m_block);
204   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
205   Page_cache_client pgman(block, worker);
206   pgman.update_lsn(key, lsn);
207 }
208 
209 int
drop_page(Page_cache_client & caller,Local_key key,Uint32 page_id)210 PgmanProxy::drop_page(Page_cache_client& caller,
211                       Local_key key, Uint32 page_id)
212 {
213   ndbrequire(blockToInstance(caller.m_block) == 0);
214   SimulatedBlock* block = globalData.getBlock(caller.m_block);
215   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
216   Page_cache_client pgman(block, worker);
217   int ret = pgman.drop_page(key, page_id);
218   return ret;
219 }
220 
221 /*
222  * Following contact all workers.  First the method is called
223  * on extra worker.  Then DATA_FILE_ORD is sent to LQH workers.
224  * The result must be same since configurations are identical.
225  */
226 
227 Uint32
create_data_file(Signal * signal)228 PgmanProxy::create_data_file(Signal* signal)
229 {
230   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
231   Uint32 ret = worker->create_data_file();
232   Uint32 i;
233   for (i = 0; i < c_workers - 1; i++) {
234     jam();
235     send_data_file_ord(signal, i, ret,
236                        DataFileOrd::CreateDataFile);
237   }
238   return ret;
239 }
240 
241 Uint32
alloc_data_file(Signal * signal,Uint32 file_no)242 PgmanProxy::alloc_data_file(Signal* signal, Uint32 file_no)
243 {
244   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
245   Uint32 ret = worker->alloc_data_file(file_no);
246   Uint32 i;
247   for (i = 0; i < c_workers - 1; i++) {
248     jam();
249     send_data_file_ord(signal, i, ret,
250                        DataFileOrd::AllocDataFile, file_no);
251   }
252   return ret;
253 }
254 
255 void
map_file_no(Signal * signal,Uint32 file_no,Uint32 fd)256 PgmanProxy::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
257 {
258   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
259   worker->map_file_no(file_no, fd);
260   Uint32 i;
261   for (i = 0; i < c_workers - 1; i++) {
262     jam();
263     send_data_file_ord(signal, i, ~(Uint32)0,
264                        DataFileOrd::MapFileNo, file_no, fd);
265   }
266 }
267 
268 void
free_data_file(Signal * signal,Uint32 file_no,Uint32 fd)269 PgmanProxy::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
270 {
271   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
272   worker->free_data_file(file_no, fd);
273   Uint32 i;
274   for (i = 0; i < c_workers - 1; i++) {
275     jam();
276     send_data_file_ord(signal, i, ~(Uint32)0,
277                        DataFileOrd::FreeDataFile, file_no, fd);
278   }
279 }
280 
281 void
send_data_file_ord(Signal * signal,Uint32 i,Uint32 ret,Uint32 cmd,Uint32 file_no,Uint32 fd)282 PgmanProxy::send_data_file_ord(Signal* signal, Uint32 i, Uint32 ret,
283                                Uint32 cmd, Uint32 file_no, Uint32 fd)
284 {
285   DataFileOrd* ord = (DataFileOrd*)signal->getDataPtrSend();
286   ord->ret = ret;
287   ord->cmd = cmd;
288   ord->file_no = file_no;
289   ord->fd = fd;
290   sendSignal(workerRef(i), GSN_DATA_FILE_ORD,
291              signal, DataFileOrd::SignalLength, JBB);
292 }
293 
294 BLOCK_FUNCTIONS(PgmanProxy)
295