1 /*
2    Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 #include "PgmanProxy.hpp"
25 #include "pgman.hpp"
26 #include <signaldata/DataFileOrd.hpp>
27 
28 #define JAM_FILE_ID 470
29 
30 
PgmanProxy(Block_context & ctx)31 PgmanProxy::PgmanProxy(Block_context& ctx) :
32   LocalProxy(PGMAN, ctx)
33 {
34   // GSN_SYNC_EXTENT_PAGES_REQ
35   addRecSignal(GSN_SYNC_EXTENT_PAGES_REQ,
36                &PgmanProxy::execSYNC_EXTENT_PAGES_REQ);
37   // GSN_END_LCPREQ
38   addRecSignal(GSN_END_LCPREQ, &PgmanProxy::execEND_LCPREQ);
39   addRecSignal(GSN_END_LCPCONF, &PgmanProxy::execEND_LCPCONF);
40   addRecSignal(GSN_RELEASE_PAGES_CONF, &PgmanProxy::execRELEASE_PAGES_CONF);
41 }
42 
~PgmanProxy()43 PgmanProxy::~PgmanProxy()
44 {
45 }
46 
47 SimulatedBlock*
newWorker(Uint32 instanceNo)48 PgmanProxy::newWorker(Uint32 instanceNo)
49 {
50   return new Pgman(m_ctx, instanceNo);
51 }
52 
53 // GSN_SYNC_EXTENT_PAGES_REQ
54 void
execSYNC_EXTENT_PAGES_REQ(Signal * signal)55 PgmanProxy::execSYNC_EXTENT_PAGES_REQ(Signal *signal)
56 {
57   // Route signal on to extra PGMAN worker that handles extent pages
58   // The return signal will be sent directly from there to sender
59   // Same data sent, so proxy block is merely a router here.
60   jamEntry();
61   sendSignal(workerRef(c_workers - 1), GSN_SYNC_EXTENT_PAGES_REQ, signal,
62              SyncExtentPagesReq::SignalLength, JBB);
63   return;
64 }
65 
66 // GSN_END_LCPREQ
67 
68 void
execEND_LCPREQ(Signal * signal)69 PgmanProxy::execEND_LCPREQ(Signal* signal)
70 {
71   const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
72   Uint32 ssId = getSsId(req);
73   Ss_END_LCPREQ& ss = ssSeize<Ss_END_LCPREQ>(ssId);
74   ss.m_req = *req;
75 
76   const Uint32 sb = refToBlock(ss.m_req.senderRef);
77   ndbrequire(sb == DBLQH || sb == LGMAN);
78 
79   ndbrequire(sb == LGMAN);
80   {
81     jam();
82     /*
83      * At end of UNDO execution.  Extra PGMAN worker was used to
84      * read up TUP pages.  Release these pages now.
85      */
86     ReleasePagesReq* req = (ReleasePagesReq*)signal->getDataPtrSend();
87     req->senderData = ssId;
88     req->senderRef = reference();
89     req->requestType = ReleasePagesReq::RT_RELEASE_UNLOCKED;
90     req->requestData = 0;
91     // Extra worker
92     sendSignal(workerRef(c_workers - 1), GSN_RELEASE_PAGES_REQ,
93                signal, ReleasePagesReq::SignalLength, JBB);
94     return;
95   }
96 }
97 
98 void
execRELEASE_PAGES_CONF(Signal * signal)99 PgmanProxy::execRELEASE_PAGES_CONF(Signal* signal)
100 {
101   jam();
102   const ReleasePagesConf* conf = (const ReleasePagesConf*)signal->getDataPtr();
103   Uint32 ssId = getSsId(conf);
104   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
105   sendREQ(signal, ss);
106 }
107 
108 void
sendEND_LCPREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)109 PgmanProxy::sendEND_LCPREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
110 {
111   jam();
112   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
113 
114   EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
115   *req = ss.m_req;
116   req->senderData = ssId;
117   req->senderRef = reference();
118   sendSignalNoRelease(workerRef(ss.m_worker), GSN_END_LCPREQ,
119                       signal, EndLcpReq::SignalLength, JBB, handle);
120 }
121 
122 void
execEND_LCPCONF(Signal * signal)123 PgmanProxy::execEND_LCPCONF(Signal* signal)
124 {
125   jam();
126   const EndLcpConf* conf = (EndLcpConf*)signal->getDataPtr();
127   Uint32 ssId = conf->senderData;
128   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
129   recvCONF(signal, ss);
130 }
131 
132 void
sendEND_LCPCONF(Signal * signal,Uint32 ssId)133 PgmanProxy::sendEND_LCPCONF(Signal* signal, Uint32 ssId)
134 {
135   jam();
136   Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
137   BlockReference senderRef = ss.m_req.senderRef;
138 
139   if (!lastReply(ss))
140   {
141     jam();
142     return;
143   }
144 
145   if (!ss.m_extraLast)
146   {
147     jam();
148     ss.m_extraLast = true;
149     ss.m_worker = c_workers - 1; // send to last PGMAN
150     ss.m_workerMask.set(ss.m_worker);
151     SectionHandle handle(this);
152     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
153     return;
154   }
155 
156   if (ss.m_error == 0)
157   {
158     jam();
159     EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
160     conf->senderData = ss.m_req.senderData;
161     conf->senderRef = reference();
162     sendSignal(senderRef, GSN_END_LCPCONF,
163                signal, EndLcpConf::SignalLength, JBB);
164   }
165   else
166   {
167     ndbabort();
168   }
169 
170   ssRelease<Ss_END_LCPREQ>(ssId);
171 }
172 
173 // client methods
174 
175 /*
176  * Here caller must have instance 0.  The extra worker in our
177  * thread is used.  These are extent pages.
178  */
179 
180 void
get_extent_page(Page_cache_client & caller,Signal * signal,Page_cache_client::Request & req,Uint32 flags)181 PgmanProxy::get_extent_page(Page_cache_client& caller,
182                             Signal* signal,
183                             Page_cache_client::Request& req,
184                             Uint32 flags)
185 {
186   ndbrequire(blockToInstance(caller.m_block) == 0);
187   SimulatedBlock* block = globalData.getBlock(caller.m_block);
188   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
189   Page_cache_client pgman(block, worker);
190   pgman.get_extent_page(signal, req, flags);
191   caller.m_ptr = pgman.m_ptr;
192 }
193 
194 int
get_page(Page_cache_client & caller,Signal * signal,Page_cache_client::Request & req,Uint32 flags)195 PgmanProxy::get_page(Page_cache_client& caller,
196                      Signal* signal,
197                      Page_cache_client::Request& req, Uint32 flags)
198 {
199   ndbrequire(blockToInstance(caller.m_block) == 0);
200   SimulatedBlock* block = globalData.getBlock(caller.m_block);
201   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
202   Page_cache_client pgman(block, worker);
203   int ret = pgman.get_page(signal, req, flags);
204   caller.m_ptr = pgman.m_ptr;
205   return ret;
206 }
207 
208 void
update_lsn(Signal * signal,Page_cache_client & caller,Local_key key,Uint64 lsn)209 PgmanProxy::update_lsn(Signal *signal,
210                        Page_cache_client& caller,
211                        Local_key key,
212                        Uint64 lsn)
213 {
214   ndbrequire(blockToInstance(caller.m_block) == 0);
215   SimulatedBlock* block = globalData.getBlock(caller.m_block);
216   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
217   Page_cache_client pgman(block, worker);
218   pgman.update_lsn(signal, key, lsn);
219 }
220 
221 int
drop_page(Page_cache_client & caller,Local_key key,Uint32 page_id)222 PgmanProxy::drop_page(Page_cache_client& caller,
223                       Local_key key, Uint32 page_id)
224 {
225   ndbrequire(blockToInstance(caller.m_block) == 0);
226   SimulatedBlock* block = globalData.getBlock(caller.m_block);
227   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
228   Page_cache_client pgman(block, worker);
229   int ret = pgman.drop_page(key, page_id);
230   return ret;
231 }
232 
233 /*
234  * Following contact all workers.  First the method is called
235  * on extra worker.  Then DATA_FILE_ORD is sent to LQH workers.
236  * The result must be same since configurations are identical.
237  */
238 
239 Uint32
create_data_file(Signal * signal,Uint32 version)240 PgmanProxy::create_data_file(Signal* signal, Uint32 version)
241 {
242   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
243   Uint32 ret = worker->create_data_file(version);
244   Uint32 i;
245   for (i = 0; i < c_workers - 1; i++) {
246     jam();
247     send_data_file_ord(signal, i, ret, version,
248                        DataFileOrd::CreateDataFile);
249   }
250   return ret;
251 }
252 
253 Uint32
alloc_data_file(Signal * signal,Uint32 file_no,Uint32 version)254 PgmanProxy::alloc_data_file(Signal* signal, Uint32 file_no, Uint32 version)
255 {
256   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
257   Uint32 ret = worker->alloc_data_file(file_no, version);
258   Uint32 i;
259   for (i = 0; i < c_workers - 1; i++) {
260     jam();
261     send_data_file_ord(signal, i, ret, version,
262                        DataFileOrd::AllocDataFile, file_no);
263   }
264   return ret;
265 }
266 
267 void
map_file_no(Signal * signal,Uint32 file_no,Uint32 fd)268 PgmanProxy::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
269 {
270   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
271   worker->map_file_no(file_no, fd);
272   Uint32 i;
273   for (i = 0; i < c_workers - 1; i++) {
274     jam();
275     send_data_file_ord(signal, i, ~(Uint32)0, 0,
276                        DataFileOrd::MapFileNo, file_no, fd);
277   }
278 }
279 
280 void
free_data_file(Signal * signal,Uint32 file_no,Uint32 fd)281 PgmanProxy::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
282 {
283   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
284   worker->free_data_file(file_no, fd);
285   Uint32 i;
286   for (i = 0; i < c_workers - 1; i++) {
287     jam();
288     send_data_file_ord(signal, i, ~(Uint32)0, 0,
289                        DataFileOrd::FreeDataFile, file_no, fd);
290   }
291 }
292 
293 void
send_data_file_ord(Signal * signal,Uint32 i,Uint32 ret,Uint32 version,Uint32 cmd,Uint32 file_no,Uint32 fd)294 PgmanProxy::send_data_file_ord(Signal* signal,
295                                Uint32 i,
296                                Uint32 ret,
297                                Uint32 version,
298                                Uint32 cmd,
299                                Uint32 file_no,
300                                Uint32 fd)
301 {
302   DataFileOrd* ord = (DataFileOrd*)signal->getDataPtrSend();
303   ord->ret = ret;
304   ord->version = version;
305   ord->cmd = cmd;
306   ord->file_no = file_no;
307   ord->fd = fd;
308   sendSignal(workerRef(i), GSN_DATA_FILE_ORD,
309              signal, DataFileOrd::SignalLength, JBB);
310 }
311 
312 bool
extent_pages_available(Uint32 pages_needed,Page_cache_client & caller)313 PgmanProxy::extent_pages_available(Uint32 pages_needed,
314                                           Page_cache_client& caller)
315 {
316   ndbrequire(blockToInstance(caller.m_block) == 0);
317   Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
318   return worker->extent_pages_available(pages_needed);
319 }
320 
321 BLOCK_FUNCTIONS(PgmanProxy)
322