1 /*
2 Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 #include "PgmanProxy.hpp"
25 #include "pgman.hpp"
26 #include <signaldata/DataFileOrd.hpp>
27
28 #define JAM_FILE_ID 470
29
30
PgmanProxy(Block_context & ctx)31 PgmanProxy::PgmanProxy(Block_context& ctx) :
32 LocalProxy(PGMAN, ctx)
33 {
34 // GSN_SYNC_EXTENT_PAGES_REQ
35 addRecSignal(GSN_SYNC_EXTENT_PAGES_REQ,
36 &PgmanProxy::execSYNC_EXTENT_PAGES_REQ);
37 // GSN_END_LCPREQ
38 addRecSignal(GSN_END_LCPREQ, &PgmanProxy::execEND_LCPREQ);
39 addRecSignal(GSN_END_LCPCONF, &PgmanProxy::execEND_LCPCONF);
40 addRecSignal(GSN_RELEASE_PAGES_CONF, &PgmanProxy::execRELEASE_PAGES_CONF);
41 }
42
~PgmanProxy()43 PgmanProxy::~PgmanProxy()
44 {
45 }
46
47 SimulatedBlock*
newWorker(Uint32 instanceNo)48 PgmanProxy::newWorker(Uint32 instanceNo)
49 {
50 return new Pgman(m_ctx, instanceNo);
51 }
52
53 // GSN_SYNC_EXTENT_PAGES_REQ
54 void
execSYNC_EXTENT_PAGES_REQ(Signal * signal)55 PgmanProxy::execSYNC_EXTENT_PAGES_REQ(Signal *signal)
56 {
57 // Route signal on to extra PGMAN worker that handles extent pages
58 // The return signal will be sent directly from there to sender
59 // Same data sent, so proxy block is merely a router here.
60 jamEntry();
61 sendSignal(workerRef(c_workers - 1), GSN_SYNC_EXTENT_PAGES_REQ, signal,
62 SyncExtentPagesReq::SignalLength, JBB);
63 return;
64 }
65
66 // GSN_END_LCPREQ
67
68 void
execEND_LCPREQ(Signal * signal)69 PgmanProxy::execEND_LCPREQ(Signal* signal)
70 {
71 const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
72 Uint32 ssId = getSsId(req);
73 Ss_END_LCPREQ& ss = ssSeize<Ss_END_LCPREQ>(ssId);
74 ss.m_req = *req;
75
76 const Uint32 sb = refToBlock(ss.m_req.senderRef);
77 ndbrequire(sb == DBLQH || sb == LGMAN);
78
79 ndbrequire(sb == LGMAN);
80 {
81 jam();
82 /*
83 * At end of UNDO execution. Extra PGMAN worker was used to
84 * read up TUP pages. Release these pages now.
85 */
86 ReleasePagesReq* req = (ReleasePagesReq*)signal->getDataPtrSend();
87 req->senderData = ssId;
88 req->senderRef = reference();
89 req->requestType = ReleasePagesReq::RT_RELEASE_UNLOCKED;
90 req->requestData = 0;
91 // Extra worker
92 sendSignal(workerRef(c_workers - 1), GSN_RELEASE_PAGES_REQ,
93 signal, ReleasePagesReq::SignalLength, JBB);
94 return;
95 }
96 }
97
98 void
execRELEASE_PAGES_CONF(Signal * signal)99 PgmanProxy::execRELEASE_PAGES_CONF(Signal* signal)
100 {
101 jam();
102 const ReleasePagesConf* conf = (const ReleasePagesConf*)signal->getDataPtr();
103 Uint32 ssId = getSsId(conf);
104 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
105 sendREQ(signal, ss);
106 }
107
108 void
sendEND_LCPREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)109 PgmanProxy::sendEND_LCPREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
110 {
111 jam();
112 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
113
114 EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
115 *req = ss.m_req;
116 req->senderData = ssId;
117 req->senderRef = reference();
118 sendSignalNoRelease(workerRef(ss.m_worker), GSN_END_LCPREQ,
119 signal, EndLcpReq::SignalLength, JBB, handle);
120 }
121
122 void
execEND_LCPCONF(Signal * signal)123 PgmanProxy::execEND_LCPCONF(Signal* signal)
124 {
125 jam();
126 const EndLcpConf* conf = (EndLcpConf*)signal->getDataPtr();
127 Uint32 ssId = conf->senderData;
128 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
129 recvCONF(signal, ss);
130 }
131
132 void
sendEND_LCPCONF(Signal * signal,Uint32 ssId)133 PgmanProxy::sendEND_LCPCONF(Signal* signal, Uint32 ssId)
134 {
135 jam();
136 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
137 BlockReference senderRef = ss.m_req.senderRef;
138
139 if (!lastReply(ss))
140 {
141 jam();
142 return;
143 }
144
145 if (!ss.m_extraLast)
146 {
147 jam();
148 ss.m_extraLast = true;
149 ss.m_worker = c_workers - 1; // send to last PGMAN
150 ss.m_workerMask.set(ss.m_worker);
151 SectionHandle handle(this);
152 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
153 return;
154 }
155
156 if (ss.m_error == 0)
157 {
158 jam();
159 EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
160 conf->senderData = ss.m_req.senderData;
161 conf->senderRef = reference();
162 sendSignal(senderRef, GSN_END_LCPCONF,
163 signal, EndLcpConf::SignalLength, JBB);
164 }
165 else
166 {
167 ndbabort();
168 }
169
170 ssRelease<Ss_END_LCPREQ>(ssId);
171 }
172
173 // client methods
174
175 /*
176 * Here caller must have instance 0. The extra worker in our
177 * thread is used. These are extent pages.
178 */
179
180 void
get_extent_page(Page_cache_client & caller,Signal * signal,Page_cache_client::Request & req,Uint32 flags)181 PgmanProxy::get_extent_page(Page_cache_client& caller,
182 Signal* signal,
183 Page_cache_client::Request& req,
184 Uint32 flags)
185 {
186 ndbrequire(blockToInstance(caller.m_block) == 0);
187 SimulatedBlock* block = globalData.getBlock(caller.m_block);
188 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
189 Page_cache_client pgman(block, worker);
190 pgman.get_extent_page(signal, req, flags);
191 caller.m_ptr = pgman.m_ptr;
192 }
193
194 int
get_page(Page_cache_client & caller,Signal * signal,Page_cache_client::Request & req,Uint32 flags)195 PgmanProxy::get_page(Page_cache_client& caller,
196 Signal* signal,
197 Page_cache_client::Request& req, Uint32 flags)
198 {
199 ndbrequire(blockToInstance(caller.m_block) == 0);
200 SimulatedBlock* block = globalData.getBlock(caller.m_block);
201 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
202 Page_cache_client pgman(block, worker);
203 int ret = pgman.get_page(signal, req, flags);
204 caller.m_ptr = pgman.m_ptr;
205 return ret;
206 }
207
208 void
update_lsn(Signal * signal,Page_cache_client & caller,Local_key key,Uint64 lsn)209 PgmanProxy::update_lsn(Signal *signal,
210 Page_cache_client& caller,
211 Local_key key,
212 Uint64 lsn)
213 {
214 ndbrequire(blockToInstance(caller.m_block) == 0);
215 SimulatedBlock* block = globalData.getBlock(caller.m_block);
216 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
217 Page_cache_client pgman(block, worker);
218 pgman.update_lsn(signal, key, lsn);
219 }
220
221 int
drop_page(Page_cache_client & caller,Local_key key,Uint32 page_id)222 PgmanProxy::drop_page(Page_cache_client& caller,
223 Local_key key, Uint32 page_id)
224 {
225 ndbrequire(blockToInstance(caller.m_block) == 0);
226 SimulatedBlock* block = globalData.getBlock(caller.m_block);
227 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
228 Page_cache_client pgman(block, worker);
229 int ret = pgman.drop_page(key, page_id);
230 return ret;
231 }
232
233 /*
234 * Following contact all workers. First the method is called
235 * on extra worker. Then DATA_FILE_ORD is sent to LQH workers.
236 * The result must be same since configurations are identical.
237 */
238
239 Uint32
create_data_file(Signal * signal,Uint32 version)240 PgmanProxy::create_data_file(Signal* signal, Uint32 version)
241 {
242 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
243 Uint32 ret = worker->create_data_file(version);
244 Uint32 i;
245 for (i = 0; i < c_workers - 1; i++) {
246 jam();
247 send_data_file_ord(signal, i, ret, version,
248 DataFileOrd::CreateDataFile);
249 }
250 return ret;
251 }
252
253 Uint32
alloc_data_file(Signal * signal,Uint32 file_no,Uint32 version)254 PgmanProxy::alloc_data_file(Signal* signal, Uint32 file_no, Uint32 version)
255 {
256 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
257 Uint32 ret = worker->alloc_data_file(file_no, version);
258 Uint32 i;
259 for (i = 0; i < c_workers - 1; i++) {
260 jam();
261 send_data_file_ord(signal, i, ret, version,
262 DataFileOrd::AllocDataFile, file_no);
263 }
264 return ret;
265 }
266
267 void
map_file_no(Signal * signal,Uint32 file_no,Uint32 fd)268 PgmanProxy::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
269 {
270 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
271 worker->map_file_no(file_no, fd);
272 Uint32 i;
273 for (i = 0; i < c_workers - 1; i++) {
274 jam();
275 send_data_file_ord(signal, i, ~(Uint32)0, 0,
276 DataFileOrd::MapFileNo, file_no, fd);
277 }
278 }
279
280 void
free_data_file(Signal * signal,Uint32 file_no,Uint32 fd)281 PgmanProxy::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
282 {
283 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
284 worker->free_data_file(file_no, fd);
285 Uint32 i;
286 for (i = 0; i < c_workers - 1; i++) {
287 jam();
288 send_data_file_ord(signal, i, ~(Uint32)0, 0,
289 DataFileOrd::FreeDataFile, file_no, fd);
290 }
291 }
292
293 void
send_data_file_ord(Signal * signal,Uint32 i,Uint32 ret,Uint32 version,Uint32 cmd,Uint32 file_no,Uint32 fd)294 PgmanProxy::send_data_file_ord(Signal* signal,
295 Uint32 i,
296 Uint32 ret,
297 Uint32 version,
298 Uint32 cmd,
299 Uint32 file_no,
300 Uint32 fd)
301 {
302 DataFileOrd* ord = (DataFileOrd*)signal->getDataPtrSend();
303 ord->ret = ret;
304 ord->version = version;
305 ord->cmd = cmd;
306 ord->file_no = file_no;
307 ord->fd = fd;
308 sendSignal(workerRef(i), GSN_DATA_FILE_ORD,
309 signal, DataFileOrd::SignalLength, JBB);
310 }
311
312 bool
extent_pages_available(Uint32 pages_needed,Page_cache_client & caller)313 PgmanProxy::extent_pages_available(Uint32 pages_needed,
314 Page_cache_client& caller)
315 {
316 ndbrequire(blockToInstance(caller.m_block) == 0);
317 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
318 return worker->extent_pages_available(pages_needed);
319 }
320
321 BLOCK_FUNCTIONS(PgmanProxy)
322