1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22
23 #include "PgmanProxy.hpp"
24 #include "pgman.hpp"
25 #include <signaldata/DataFileOrd.hpp>
26
27 #define JAM_FILE_ID 470
28
29
PgmanProxy(Block_context & ctx)30 PgmanProxy::PgmanProxy(Block_context& ctx) :
31 LocalProxy(PGMAN, ctx)
32 {
33 // GSN_LCP_FRAG_ORD
34 addRecSignal(GSN_LCP_FRAG_ORD, &PgmanProxy::execLCP_FRAG_ORD);
35
36 // GSN_END_LCPREQ
37 addRecSignal(GSN_END_LCPREQ, &PgmanProxy::execEND_LCPREQ);
38 addRecSignal(GSN_END_LCPCONF, &PgmanProxy::execEND_LCPCONF);
39 addRecSignal(GSN_RELEASE_PAGES_CONF, &PgmanProxy::execRELEASE_PAGES_CONF);
40 }
41
~PgmanProxy()42 PgmanProxy::~PgmanProxy()
43 {
44 }
45
46 SimulatedBlock*
newWorker(Uint32 instanceNo)47 PgmanProxy::newWorker(Uint32 instanceNo)
48 {
49 return new Pgman(m_ctx, instanceNo);
50 }
51
52 // GSN_LCP_FRAG_ORD
53
54 void
execLCP_FRAG_ORD(Signal * signal)55 PgmanProxy::execLCP_FRAG_ORD(Signal* signal)
56 {
57 const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
58 Uint32 ssId = getSsId(req);
59 Ss_LCP_FRAG_ORD& ss = ssSeize<Ss_LCP_FRAG_ORD>(ssId);
60 ss.m_req = *req;
61 sendREQ(signal, ss);
62 ssRelease<Ss_LCP_FRAG_ORD>(ssId);
63 }
64
65 void
sendLCP_FRAG_ORD(Signal * signal,Uint32 ssId,SectionHandle * handle)66 PgmanProxy::sendLCP_FRAG_ORD(Signal* signal, Uint32 ssId, SectionHandle* handle)
67 {
68 Ss_LCP_FRAG_ORD& ss = ssFind<Ss_LCP_FRAG_ORD>(ssId);
69 LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
70 *req = ss.m_req;
71 sendSignalNoRelease(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
72 signal, LcpFragOrd::SignalLength, JBB, handle);
73 }
74
75 // GSN_END_LCPREQ
76
77 void
execEND_LCPREQ(Signal * signal)78 PgmanProxy::execEND_LCPREQ(Signal* signal)
79 {
80 const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
81 Uint32 ssId = getSsId(req);
82 Ss_END_LCPREQ& ss = ssSeize<Ss_END_LCPREQ>(ssId);
83 ss.m_req = *req;
84
85 const Uint32 sb = refToBlock(ss.m_req.senderRef);
86 ndbrequire(sb == DBLQH || sb == LGMAN);
87
88 if (sb == LGMAN) {
89 jam();
90 /*
91 * At end of UNDO execution. Extra PGMAN worker was used to
92 * read up TUP pages. Release these pages now.
93 */
94 ReleasePagesReq* req = (ReleasePagesReq*)signal->getDataPtrSend();
95 req->senderData = ssId;
96 req->senderRef = reference();
97 req->requestType = ReleasePagesReq::RT_RELEASE_UNLOCKED;
98 req->requestData = 0;
99 // Extra worker
100 sendSignal(workerRef(c_workers - 1), GSN_RELEASE_PAGES_REQ,
101 signal, ReleasePagesReq::SignalLength, JBB);
102 return;
103 }
104 /**
105 * Send to extra PGMAN *after* all other PGMAN has completed
106 */
107 sendREQ(signal, ss, /* skip last */ true);
108 }
109
110 void
execRELEASE_PAGES_CONF(Signal * signal)111 PgmanProxy::execRELEASE_PAGES_CONF(Signal* signal)
112 {
113 const ReleasePagesConf* conf = (const ReleasePagesConf*)signal->getDataPtr();
114 Uint32 ssId = getSsId(conf);
115 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
116 sendREQ(signal, ss);
117 }
118
119 void
sendEND_LCPREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)120 PgmanProxy::sendEND_LCPREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
121 {
122 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
123
124 EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
125 *req = ss.m_req;
126 req->senderData = ssId;
127 req->senderRef = reference();
128 sendSignalNoRelease(workerRef(ss.m_worker), GSN_END_LCPREQ,
129 signal, EndLcpReq::SignalLength, JBB, handle);
130 }
131
132 void
execEND_LCPCONF(Signal * signal)133 PgmanProxy::execEND_LCPCONF(Signal* signal)
134 {
135 const EndLcpConf* conf = (EndLcpConf*)signal->getDataPtr();
136 Uint32 ssId = conf->senderData;
137 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
138 recvCONF(signal, ss);
139 }
140
141 void
sendEND_LCPCONF(Signal * signal,Uint32 ssId)142 PgmanProxy::sendEND_LCPCONF(Signal* signal, Uint32 ssId)
143 {
144 Ss_END_LCPREQ& ss = ssFind<Ss_END_LCPREQ>(ssId);
145 BlockReference senderRef = ss.m_req.senderRef;
146
147 if (!lastReply(ss)) {
148 jam();
149 return;
150 }
151
152 if (!ss.m_extraLast)
153 {
154 jam();
155 ss.m_extraLast = true;
156 ss.m_worker = c_workers - 1; // send to last PGMAN
157 ss.m_workerMask.set(ss.m_worker);
158 SectionHandle handle(this);
159 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
160 return;
161 }
162
163 if (ss.m_error == 0) {
164 jam();
165 EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
166 conf->senderData = ss.m_req.senderData;
167 conf->senderRef = reference();
168 sendSignal(senderRef, GSN_END_LCPCONF,
169 signal, EndLcpConf::SignalLength, JBB);
170 } else {
171 ndbrequire(false);
172 }
173
174 ssRelease<Ss_END_LCPREQ>(ssId);
175 }
176
177 // client methods
178
179 /*
180 * Here caller must have instance 0. The extra worker in our
181 * thread is used. These are extent pages.
182 */
183
184 int
get_page(Page_cache_client & caller,Signal * signal,Page_cache_client::Request & req,Uint32 flags)185 PgmanProxy::get_page(Page_cache_client& caller,
186 Signal* signal,
187 Page_cache_client::Request& req, Uint32 flags)
188 {
189 ndbrequire(blockToInstance(caller.m_block) == 0);
190 SimulatedBlock* block = globalData.getBlock(caller.m_block);
191 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
192 Page_cache_client pgman(block, worker);
193 int ret = pgman.get_page(signal, req, flags);
194 caller.m_ptr = pgman.m_ptr;
195 return ret;
196 }
197
198 void
update_lsn(Page_cache_client & caller,Local_key key,Uint64 lsn)199 PgmanProxy::update_lsn(Page_cache_client& caller,
200 Local_key key, Uint64 lsn)
201 {
202 ndbrequire(blockToInstance(caller.m_block) == 0);
203 SimulatedBlock* block = globalData.getBlock(caller.m_block);
204 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
205 Page_cache_client pgman(block, worker);
206 pgman.update_lsn(key, lsn);
207 }
208
209 int
drop_page(Page_cache_client & caller,Local_key key,Uint32 page_id)210 PgmanProxy::drop_page(Page_cache_client& caller,
211 Local_key key, Uint32 page_id)
212 {
213 ndbrequire(blockToInstance(caller.m_block) == 0);
214 SimulatedBlock* block = globalData.getBlock(caller.m_block);
215 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
216 Page_cache_client pgman(block, worker);
217 int ret = pgman.drop_page(key, page_id);
218 return ret;
219 }
220
221 /*
222 * Following contact all workers. First the method is called
223 * on extra worker. Then DATA_FILE_ORD is sent to LQH workers.
224 * The result must be same since configurations are identical.
225 */
226
227 Uint32
create_data_file(Signal * signal)228 PgmanProxy::create_data_file(Signal* signal)
229 {
230 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
231 Uint32 ret = worker->create_data_file();
232 Uint32 i;
233 for (i = 0; i < c_workers - 1; i++) {
234 jam();
235 send_data_file_ord(signal, i, ret,
236 DataFileOrd::CreateDataFile);
237 }
238 return ret;
239 }
240
241 Uint32
alloc_data_file(Signal * signal,Uint32 file_no)242 PgmanProxy::alloc_data_file(Signal* signal, Uint32 file_no)
243 {
244 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
245 Uint32 ret = worker->alloc_data_file(file_no);
246 Uint32 i;
247 for (i = 0; i < c_workers - 1; i++) {
248 jam();
249 send_data_file_ord(signal, i, ret,
250 DataFileOrd::AllocDataFile, file_no);
251 }
252 return ret;
253 }
254
255 void
map_file_no(Signal * signal,Uint32 file_no,Uint32 fd)256 PgmanProxy::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
257 {
258 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
259 worker->map_file_no(file_no, fd);
260 Uint32 i;
261 for (i = 0; i < c_workers - 1; i++) {
262 jam();
263 send_data_file_ord(signal, i, ~(Uint32)0,
264 DataFileOrd::MapFileNo, file_no, fd);
265 }
266 }
267
268 void
free_data_file(Signal * signal,Uint32 file_no,Uint32 fd)269 PgmanProxy::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
270 {
271 Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
272 worker->free_data_file(file_no, fd);
273 Uint32 i;
274 for (i = 0; i < c_workers - 1; i++) {
275 jam();
276 send_data_file_ord(signal, i, ~(Uint32)0,
277 DataFileOrd::FreeDataFile, file_no, fd);
278 }
279 }
280
281 void
send_data_file_ord(Signal * signal,Uint32 i,Uint32 ret,Uint32 cmd,Uint32 file_no,Uint32 fd)282 PgmanProxy::send_data_file_ord(Signal* signal, Uint32 i, Uint32 ret,
283 Uint32 cmd, Uint32 file_no, Uint32 fd)
284 {
285 DataFileOrd* ord = (DataFileOrd*)signal->getDataPtrSend();
286 ord->ret = ret;
287 ord->cmd = cmd;
288 ord->file_no = file_no;
289 ord->fd = fd;
290 sendSignal(workerRef(i), GSN_DATA_FILE_ORD,
291 signal, DataFileOrd::SignalLength, JBB);
292 }
293
294 BLOCK_FUNCTIONS(PgmanProxy)
295