1 /*
2 Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "atrt.hpp"
26 #include <NdbSleep.h>
27
28 static bool connect_mysqld(atrt_process* proc);
29 static bool populate_db(atrt_config&, atrt_process*);
30 static bool setup_repl(atrt_config&);
31
32 static
33 bool
run_query(atrt_process * proc,const char * query)34 run_query(atrt_process* proc, const char * query)
35 {
36 MYSQL* mysql = &proc->m_mysql;
37 g_logger.debug("'%s@%s' - Running query '%s'",
38 proc->m_cluster->m_name.c_str(),
39 proc->m_host->m_hostname.c_str(),
40 query);
41
42 if (mysql_query(mysql, query))
43 {
44 g_logger.error("'%s@%s' - Failed to run query '%s' %d:%s",
45 proc->m_cluster->m_name.c_str(),
46 proc->m_host->m_hostname.c_str(),
47 query,
48 mysql_errno(mysql),
49 mysql_error(mysql));
50 return false;
51 }
52 return true;
53 }
54
55 static const char* create_sql[] = {
56 "create database atrt",
57
58 "use atrt",
59
60 "create table host ("
61 " id int primary key,"
62 " name varchar(255),"
63 " port int unsigned,"
64 " unique(name, port)"
65 ") engine = myisam;",
66
67 "create table cluster ("
68 " id int primary key,"
69 " name varchar(255),"
70 " unique(name)"
71 " ) engine = myisam;",
72
73 "create table process ("
74 " id int primary key,"
75 " host_id int not null,"
76 " cluster_id int not null,"
77 " node_id int not null,"
78 " type enum ('ndbd', 'ndbapi', 'ndb_mgmd', 'mysqld', 'mysql') not null,"
79 " state enum ('starting', 'started', 'stopping', 'stopped') not null"
80 " ) engine = myisam;",
81
82 "create table options ("
83 " id int primary key,"
84 " process_id int not null,"
85 " name varchar(255) not null,"
86 " value varchar(255) not null"
87 " ) engine = myisam;",
88
89 "create table repl ("
90 " id int auto_increment primary key,"
91 " master_id int not null,"
92 " slave_id int not null"
93 " ) engine = myisam;",
94
95 "create table command ("
96 " id int auto_increment primary key,"
97 " state enum ('new', 'running', 'done') not null default 'new',"
98 " cmd int not null,"
99 " process_id int not null,"
100 " process_args varchar(255) default NULL"
101 " ) engine = myisam;",
102
103 0};
104
105 bool
setup_db(atrt_config & config)106 setup_db(atrt_config& config)
107 {
108 /**
109 * Install atrt db
110 */
111 atrt_process* atrt_client = 0;
112 {
113 atrt_cluster* cluster = 0;
114 for (size_t i = 0; i<config.m_clusters.size(); i++)
115 {
116 if (strcmp(config.m_clusters[i]->m_name.c_str(), ".atrt") == 0)
117 {
118 cluster = config.m_clusters[i];
119
120 for (size_t i = 0; i<cluster->m_processes.size(); i++)
121 {
122 if (cluster->m_processes[i]->m_type == atrt_process::AP_CLIENT)
123 {
124 atrt_client = cluster->m_processes[i];
125 break;
126 }
127 }
128 break;
129 }
130 }
131 }
132
133 /**
134 * connect to all mysqld's
135 */
136 #ifndef _WIN32
137 for (size_t i = 0; i<config.m_processes.size(); i++)
138 {
139 atrt_process * proc = config.m_processes[i];
140 if (proc->m_type == atrt_process::AP_MYSQLD)
141 {
142 if (!connect_mysqld(config.m_processes[i]))
143 return false;
144 }
145 }
146
147 if (atrt_client)
148 {
149 atrt_process* atrt_mysqld = atrt_client->m_mysqld;
150 assert(atrt_mysqld);
151
152 // Run the commands to create the db
153 for (int i = 0; create_sql[i]; i++)
154 {
155 const char* query = create_sql[i];
156 if (!run_query(atrt_mysqld, query))
157 return false;
158 }
159
160 if (!populate_db(config, atrt_mysqld))
161 return false;
162 }
163
164 /**
165 * setup replication
166 */
167 if (setup_repl(config) != true)
168 return false;
169 #endif
170
171 return true;
172 }
173
174 static
175 const char*
find(atrt_process * proc,const char * key)176 find(atrt_process* proc, const char * key)
177 {
178 const char * res = 0;
179 if (proc->m_options.m_loaded.get(key, &res))
180 return res;
181
182 proc->m_options.m_generated.get(key, &res);
183 return res;
184 }
185
186 bool
connect_mysqld(atrt_process * proc)187 connect_mysqld(atrt_process* proc)
188 {
189 if ( !mysql_init(&proc->m_mysql))
190 {
191 g_logger.error("Failed to init mysql");
192 return false;
193 }
194
195 const char * port = find(proc, "--port=");
196 const char * socket = find(proc, "--socket=");
197 if (port == 0 && socket == 0)
198 {
199 g_logger.error("Neither socket nor port specified...cant connect to mysql");
200 return false;
201 }
202
203 for (size_t i = 0; i<20; i++)
204 {
205 if (port)
206 {
207 mysql_protocol_type val = MYSQL_PROTOCOL_TCP;
208 mysql_options(&proc->m_mysql, MYSQL_OPT_PROTOCOL, &val);
209 }
210 if (mysql_real_connect(&proc->m_mysql,
211 proc->m_host->m_hostname.c_str(),
212 "root", "", "test",
213 port ? atoi(port) : 0,
214 socket,
215 0))
216 {
217 return true;
218 }
219 g_logger.info("Retrying connect to %s:%u 3s",
220 proc->m_host->m_hostname.c_str(),atoi(port));
221 NdbSleep_SecSleep(3);
222 }
223
224 g_logger.error("Failed to connect to mysqld err: >%s< >%s:%u:%s<",
225 mysql_error(&proc->m_mysql),
226 proc->m_host->m_hostname.c_str(), port ? atoi(port) : 0,
227 socket ? socket : "<null>");
228 return false;
229 }
230
231 void
BINDI(MYSQL_BIND & bind,int * i)232 BINDI(MYSQL_BIND& bind, int * i)
233 {
234 bind.buffer_type= MYSQL_TYPE_LONG;
235 bind.buffer= (char*)i;
236 bind.is_unsigned= 0;
237 bind.is_null= 0;
238 }
239
240 void
BINDS(MYSQL_BIND & bind,const char * s,unsigned long * len)241 BINDS(MYSQL_BIND& bind, const char * s, unsigned long * len)
242 {
243 bind.buffer_type= MYSQL_TYPE_STRING;
244 bind.buffer= (char*)s;
245 bind.buffer_length= * len = strlen(s);
246 bind.length= len;
247 bind.is_null= 0;
248 }
249
250 template <typename T>
251 int
find(T * obj,Vector<T * > & arr)252 find(T* obj, Vector<T*>& arr)
253 {
254 for (size_t i = 0; i<arr.size(); i++)
255 if (arr[i] == obj)
256 return (int)i;
257 abort();
258 return -1;
259 }
260
261 static
262 bool
populate_options(MYSQL * mysql,MYSQL_STMT * stmt,int * option_id,int process_id,Properties * p)263 populate_options(MYSQL* mysql, MYSQL_STMT* stmt, int* option_id,
264 int process_id, Properties* p)
265 {
266 int kk = *option_id;
267 Properties::Iterator it(p);
268 const char * name = it.first();
269 for (; name; name = it.next())
270 {
271 int optid = kk;
272 int proc_id = process_id;
273 unsigned long l0, l1;
274 const char * value;
275 p->get(name, &value);
276 MYSQL_BIND bind2[4];
277 bzero(bind2, sizeof(bind2));
278 BINDI(bind2[0], &optid);
279 BINDI(bind2[1], &proc_id);
280 BINDS(bind2[2], name, &l0);
281 BINDS(bind2[3], value, &l1);
282
283 if (mysql_stmt_bind_param(stmt, bind2))
284 {
285 g_logger.error("Failed to bind: %s", mysql_error(mysql));
286 return false;
287 }
288
289 if (mysql_stmt_execute(stmt))
290 {
291 g_logger.error("0 Failed to execute: %s", mysql_error(mysql));
292 return false;
293 }
294 kk++;
295 }
296 *option_id = kk;
297 return true;
298 }
299
300 static
301 bool
populate_db(atrt_config & config,atrt_process * mysqld)302 populate_db(atrt_config& config, atrt_process* mysqld)
303 {
304 {
305 const char * sql = "INSERT INTO host (id, name, port) values (?, ?, ?)";
306 MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
307 if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
308 {
309 g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
310 return false;
311 }
312
313 for (size_t i = 0; i<config.m_hosts.size(); i++)
314 {
315 unsigned long l0;
316 MYSQL_BIND bind[3];
317 bzero(bind, sizeof(bind));
318 int id = i;
319 int port = config.m_hosts[i]->m_cpcd->getPort();
320 BINDI(bind[0], &id);
321 BINDS(bind[1], config.m_hosts[i]->m_hostname.c_str(), &l0);
322 BINDI(bind[2], &port);
323 if (mysql_stmt_bind_param(stmt, bind))
324 {
325 g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
326 return false;
327 }
328
329 if (mysql_stmt_execute(stmt))
330 {
331 g_logger.error("1 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
332 return false;
333 }
334 }
335 mysql_stmt_close(stmt);
336 }
337
338 {
339 const char * sql = "INSERT INTO cluster (id, name) values (?, ?)";
340 MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
341 if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
342 {
343 g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
344 return false;
345 }
346
347 for (size_t i = 0; i<config.m_clusters.size(); i++)
348 {
349 unsigned long l0;
350 MYSQL_BIND bind[2];
351 bzero(bind, sizeof(bind));
352 int id = i;
353 BINDI(bind[0], &id);
354 BINDS(bind[1], config.m_clusters[i]->m_name.c_str(), &l0);
355
356 if (mysql_stmt_bind_param(stmt, bind))
357 {
358 g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
359 return false;
360 }
361
362 if (mysql_stmt_execute(stmt))
363 {
364 g_logger.error("2 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
365 return false;
366 }
367 }
368 mysql_stmt_close(stmt);
369 }
370
371 {
372 const char * sql =
373 "INSERT INTO process (id, host_id, cluster_id, type, state, node_id) values (?,?,?,?,?,?)";
374
375 const char * sqlopt =
376 "INSERT INTO options (id, process_id, name, value) values (?,?,?,?)";
377
378 MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
379 if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
380 {
381 g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
382 return false;
383 }
384
385 MYSQL_STMT * stmtopt = mysql_stmt_init(&mysqld->m_mysql);
386 if (mysql_stmt_prepare(stmtopt, sqlopt, strlen(sqlopt)))
387 {
388 g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
389 return false;
390 }
391
392 int option_id = 0;
393 for (size_t i = 0; i<config.m_processes.size(); i++)
394 {
395 unsigned long l0, l1;
396 MYSQL_BIND bind[6];
397 bzero(bind, sizeof(bind));
398 int id = i;
399 atrt_process* proc = config.m_processes[i];
400 int host_id = find(proc->m_host, config.m_hosts);
401 int cluster_id = find(proc->m_cluster, config.m_clusters);
402 int node_id= proc->m_nodeid;
403
404 const char * type = 0;
405 const char * state = "started";
406 switch(proc->m_type){
407 case atrt_process::AP_NDBD: type = "ndbd"; break;
408 case atrt_process::AP_NDB_API: type = "ndbapi"; state = "stopped";break;
409 case atrt_process::AP_NDB_MGMD: type = "ndb_mgmd"; break;
410 case atrt_process::AP_MYSQLD: type = "mysqld"; break;
411 case atrt_process::AP_CLIENT: type = "mysql"; state = "stopped";break;
412 default:
413 abort();
414 }
415
416 BINDI(bind[0], &id);
417 BINDI(bind[1], &host_id);
418 BINDI(bind[2], &cluster_id);
419 BINDS(bind[3], type, &l0);
420 BINDS(bind[4], state, &l1);
421 BINDI(bind[5], &node_id);
422
423 if (mysql_stmt_bind_param(stmt, bind))
424 {
425 g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
426 return false;
427 }
428
429 if (mysql_stmt_execute(stmt))
430 {
431 g_logger.error("3 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
432 return false;
433 }
434
435 if (populate_options(&mysqld->m_mysql, stmtopt, &option_id, id,
436 &proc->m_options.m_loaded) == false)
437 return false;
438
439 if (populate_options(&mysqld->m_mysql, stmtopt, &option_id, id,
440 &proc->m_cluster->m_options.m_loaded) == false)
441 return false;
442
443 }
444 mysql_stmt_close(stmt);
445 mysql_stmt_close(stmtopt);
446 }
447
448 return true;
449 }
450
451 static
452 bool
setup_repl(atrt_process * dst,atrt_process * src)453 setup_repl(atrt_process* dst, atrt_process* src)
454 {
455 if (!run_query(src, "STOP SLAVE"))
456 {
457 g_logger.error("Failed to stop slave: %s",
458 mysql_error(&src->m_mysql));
459 return false;
460 }
461
462 if (!run_query(src, "RESET SLAVE"))
463 {
464 g_logger.error("Failed to reset slave: %s",
465 mysql_error(&src->m_mysql));
466 return false;
467 }
468
469 BaseString tmp;
470 tmp.assfmt("CHANGE MASTER TO "
471 " MASTER_HOST='%s', "
472 " MASTER_PORT=%u ",
473 dst->m_host->m_hostname.c_str(),
474 atoi(find(dst, "--port=")));
475
476 if (!run_query(src, tmp.c_str()))
477 {
478 g_logger.error("Failed to setup repl from %s to %s: %s",
479 src->m_host->m_hostname.c_str(),
480 dst->m_host->m_hostname.c_str(),
481 mysql_error(&src->m_mysql));
482 return false;
483 }
484
485 if (!run_query(src, "START SLAVE"))
486 {
487 g_logger.error("Failed to start slave: %s",
488 mysql_error(&src->m_mysql));
489 return false;
490 }
491
492 g_logger.info("Replication from %s(%s) to %s(%s) setup",
493 src->m_host->m_hostname.c_str(),
494 src->m_cluster->m_name.c_str(),
495 dst->m_host->m_hostname.c_str(),
496 dst->m_cluster->m_name.c_str());
497
498 return true;
499 }
500
501 bool
setup_repl(atrt_config & config)502 setup_repl(atrt_config& config)
503 {
504 for (size_t i = 0; i<config.m_processes.size(); i++)
505 {
506 atrt_process * dst = config.m_processes[i];
507 if (dst->m_rep_src)
508 {
509 if (setup_repl(dst->m_rep_src, dst) != true)
510 return false;
511 }
512 }
513 return true;
514 }
515
516 template int find(atrt_host* obj, Vector<atrt_host*>& arr);
517 template int find(atrt_cluster* obj, Vector<atrt_cluster*>& arr);
518
519