1 /*
2    Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "atrt.hpp"
26 #include <NdbSleep.h>
27 
28 static bool connect_mysqld(atrt_process* proc);
29 static bool populate_db(atrt_config&, atrt_process*);
30 static bool setup_repl(atrt_config&);
31 
32 static
33 bool
run_query(atrt_process * proc,const char * query)34 run_query(atrt_process* proc, const char * query)
35 {
36   MYSQL* mysql = &proc->m_mysql;
37   g_logger.debug("'%s@%s' - Running query '%s'",
38 		 proc->m_cluster->m_name.c_str(),
39 		 proc->m_host->m_hostname.c_str(),
40                  query);
41 
42   if (mysql_query(mysql, query))
43   {
44     g_logger.error("'%s@%s' - Failed to run query '%s' %d:%s",
45                    proc->m_cluster->m_name.c_str(),
46                    proc->m_host->m_hostname.c_str(),
47                    query,
48                    mysql_errno(mysql),
49                    mysql_error(mysql));
50     return false;
51   }
52   return true;
53 }
54 
55 static const char* create_sql[] = {
56 "create database atrt",
57 
58 "use atrt",
59 
60 "create table host ("
61 "   id int primary key,"
62 "   name varchar(255),"
63 "   port int unsigned,"
64 "   unique(name, port)"
65 ") engine = myisam;",
66 
67 "create table cluster ("
68 "   id int primary key,"
69 "   name varchar(255),"
70 "   unique(name)"
71 "   ) engine = myisam;",
72 
73 "create table process ("
74 "  id int primary key,"
75 "  host_id int not null,"
76 "  cluster_id int not null,"
77 "  node_id int not null,"
78 "  type enum ('ndbd', 'ndbapi', 'ndb_mgmd', 'mysqld', 'mysql') not null,"
79 "  state enum ('starting', 'started', 'stopping', 'stopped') not null"
80 "  ) engine = myisam;",
81 
82 "create table options ("
83 "  id int primary key,"
84 "  process_id int not null,"
85 "  name varchar(255) not null,"
86 "  value varchar(255) not null"
87 "  ) engine = myisam;",
88 
89 "create table repl ("
90 "  id int auto_increment primary key,"
91 "  master_id int not null,"
92 "  slave_id int not null"
93 "  ) engine = myisam;",
94 
95 "create table command ("
96 "  id int auto_increment primary key,"
97 "  state enum ('new', 'running', 'done') not null default 'new',"
98 "  cmd int not null,"
99 "  process_id int not null,"
100 "  process_args varchar(255) default NULL"
101 "  ) engine = myisam;",
102 
103   0};
104 
105 bool
setup_db(atrt_config & config)106 setup_db(atrt_config& config)
107 {
108   /**
109    * Install atrt db
110    */
111   atrt_process* atrt_client = 0;
112   {
113     atrt_cluster* cluster = 0;
114     for (size_t i = 0; i<config.m_clusters.size(); i++)
115     {
116       if (strcmp(config.m_clusters[i]->m_name.c_str(), ".atrt") == 0)
117       {
118 	cluster = config.m_clusters[i];
119 
120 	for (size_t i = 0; i<cluster->m_processes.size(); i++)
121 	{
122 	  if (cluster->m_processes[i]->m_type == atrt_process::AP_CLIENT)
123 	  {
124 	    atrt_client = cluster->m_processes[i];
125 	    break;
126 	  }
127 	}
128 	break;
129       }
130     }
131   }
132 
133   /**
134    * connect to all mysqld's
135    */
136 #ifndef _WIN32
137   for (size_t i = 0; i<config.m_processes.size(); i++)
138   {
139     atrt_process * proc = config.m_processes[i];
140     if (proc->m_type == atrt_process::AP_MYSQLD)
141     {
142       if (!connect_mysqld(config.m_processes[i]))
143 	return false;
144     }
145   }
146 
147   if (atrt_client)
148   {
149     atrt_process* atrt_mysqld = atrt_client->m_mysqld;
150     assert(atrt_mysqld);
151 
152     // Run the commands to create the db
153     for (int i = 0; create_sql[i]; i++)
154     {
155       const char* query = create_sql[i];
156       if (!run_query(atrt_mysqld, query))
157         return false;
158     }
159 
160     if (!populate_db(config, atrt_mysqld))
161       return false;
162   }
163 
164   /**
165    * setup replication
166    */
167   if (setup_repl(config) != true)
168     return false;
169  #endif
170 
171   return true;
172 }
173 
174 static
175 const char*
find(atrt_process * proc,const char * key)176 find(atrt_process* proc, const char * key)
177 {
178   const char * res = 0;
179   if (proc->m_options.m_loaded.get(key, &res))
180     return res;
181 
182   proc->m_options.m_generated.get(key, &res);
183   return res;
184 }
185 
186 bool
connect_mysqld(atrt_process * proc)187 connect_mysqld(atrt_process* proc)
188 {
189   if ( !mysql_init(&proc->m_mysql))
190   {
191     g_logger.error("Failed to init mysql");
192     return false;
193   }
194 
195   const char * port = find(proc, "--port=");
196   const char * socket = find(proc, "--socket=");
197   if (port == 0 && socket == 0)
198   {
199     g_logger.error("Neither socket nor port specified...cant connect to mysql");
200     return false;
201   }
202 
203   for (size_t i = 0; i<20; i++)
204   {
205     if (port)
206     {
207       mysql_protocol_type val = MYSQL_PROTOCOL_TCP;
208       mysql_options(&proc->m_mysql, MYSQL_OPT_PROTOCOL, &val);
209     }
210     if (mysql_real_connect(&proc->m_mysql,
211 			   proc->m_host->m_hostname.c_str(),
212 			   "root", "", "test",
213 			   port ? atoi(port) : 0,
214 			   socket,
215 			   0))
216     {
217       return true;
218     }
219     g_logger.info("Retrying connect to %s:%u 3s",
220 		  proc->m_host->m_hostname.c_str(),atoi(port));
221     NdbSleep_SecSleep(3);
222   }
223 
224   g_logger.error("Failed to connect to mysqld err: >%s< >%s:%u:%s<",
225 		 mysql_error(&proc->m_mysql),
226 		 proc->m_host->m_hostname.c_str(), port ? atoi(port) : 0,
227 		 socket ? socket : "<null>");
228   return false;
229 }
230 
231 void
BINDI(MYSQL_BIND & bind,int * i)232 BINDI(MYSQL_BIND& bind, int * i)
233 {
234   bind.buffer_type= MYSQL_TYPE_LONG;
235   bind.buffer= (char*)i;
236   bind.is_unsigned= 0;
237   bind.is_null= 0;
238 }
239 
240 void
BINDS(MYSQL_BIND & bind,const char * s,unsigned long * len)241 BINDS(MYSQL_BIND& bind, const char * s, unsigned long * len)
242 {
243   bind.buffer_type= MYSQL_TYPE_STRING;
244   bind.buffer= (char*)s;
245   bind.buffer_length= * len = strlen(s);
246   bind.length= len;
247   bind.is_null= 0;
248 }
249 
250 template <typename T>
251 int
find(T * obj,Vector<T * > & arr)252 find(T* obj, Vector<T*>& arr)
253 {
254   for (size_t i = 0; i<arr.size(); i++)
255     if (arr[i] == obj)
256       return (int)i;
257   abort();
258   return -1;
259 }
260 
261 static
262 bool
populate_options(MYSQL * mysql,MYSQL_STMT * stmt,int * option_id,int process_id,Properties * p)263 populate_options(MYSQL* mysql, MYSQL_STMT* stmt, int* option_id,
264 		 int process_id, Properties* p)
265 {
266   int kk = *option_id;
267   Properties::Iterator it(p);
268   const char * name = it.first();
269   for (; name; name = it.next())
270   {
271     int optid = kk;
272     int proc_id = process_id;
273     unsigned long l0, l1;
274     const char * value;
275     p->get(name, &value);
276     MYSQL_BIND bind2[4];
277     bzero(bind2, sizeof(bind2));
278     BINDI(bind2[0], &optid);
279     BINDI(bind2[1], &proc_id);
280     BINDS(bind2[2], name, &l0);
281     BINDS(bind2[3], value, &l1);
282 
283     if (mysql_stmt_bind_param(stmt, bind2))
284     {
285       g_logger.error("Failed to bind: %s", mysql_error(mysql));
286       return false;
287     }
288 
289     if (mysql_stmt_execute(stmt))
290     {
291       g_logger.error("0 Failed to execute: %s", mysql_error(mysql));
292       return false;
293     }
294     kk++;
295   }
296   *option_id = kk;
297   return true;
298 }
299 
300 static
301 bool
populate_db(atrt_config & config,atrt_process * mysqld)302 populate_db(atrt_config& config, atrt_process* mysqld)
303 {
304   {
305     const char * sql = "INSERT INTO host (id, name, port) values (?, ?, ?)";
306     MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
307     if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
308     {
309       g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
310       return false;
311     }
312 
313     for (size_t i = 0; i<config.m_hosts.size(); i++)
314     {
315       unsigned long l0;
316       MYSQL_BIND bind[3];
317       bzero(bind, sizeof(bind));
318       int id = i;
319       int port = config.m_hosts[i]->m_cpcd->getPort();
320       BINDI(bind[0], &id);
321       BINDS(bind[1], config.m_hosts[i]->m_hostname.c_str(), &l0);
322       BINDI(bind[2], &port);
323       if (mysql_stmt_bind_param(stmt, bind))
324       {
325 	g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
326 	return false;
327       }
328 
329       if (mysql_stmt_execute(stmt))
330       {
331 	g_logger.error("1 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
332 	return false;
333       }
334     }
335     mysql_stmt_close(stmt);
336   }
337 
338   {
339     const char * sql = "INSERT INTO cluster (id, name) values (?, ?)";
340     MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
341     if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
342     {
343       g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
344       return false;
345     }
346 
347     for (size_t i = 0; i<config.m_clusters.size(); i++)
348     {
349       unsigned long l0;
350       MYSQL_BIND bind[2];
351       bzero(bind, sizeof(bind));
352       int id = i;
353       BINDI(bind[0], &id);
354       BINDS(bind[1], config.m_clusters[i]->m_name.c_str(), &l0);
355 
356       if (mysql_stmt_bind_param(stmt, bind))
357       {
358 	g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
359 	return false;
360       }
361 
362       if (mysql_stmt_execute(stmt))
363       {
364 	g_logger.error("2 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
365 	return false;
366       }
367     }
368     mysql_stmt_close(stmt);
369   }
370 
371   {
372     const char * sql =
373       "INSERT INTO process (id, host_id, cluster_id, type, state, node_id) values (?,?,?,?,?,?)";
374 
375     const char * sqlopt =
376       "INSERT INTO options (id, process_id, name, value) values (?,?,?,?)";
377 
378     MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
379     if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
380     {
381       g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
382       return false;
383     }
384 
385     MYSQL_STMT * stmtopt = mysql_stmt_init(&mysqld->m_mysql);
386     if (mysql_stmt_prepare(stmtopt, sqlopt, strlen(sqlopt)))
387     {
388       g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
389       return false;
390     }
391 
392     int option_id = 0;
393     for (size_t i = 0; i<config.m_processes.size(); i++)
394     {
395       unsigned long l0, l1;
396       MYSQL_BIND bind[6];
397       bzero(bind, sizeof(bind));
398       int id = i;
399       atrt_process* proc = config.m_processes[i];
400       int host_id = find(proc->m_host, config.m_hosts);
401       int cluster_id = find(proc->m_cluster, config.m_clusters);
402       int node_id= proc->m_nodeid;
403 
404       const char * type = 0;
405       const char * state = "started";
406       switch(proc->m_type){
407       case atrt_process::AP_NDBD:     type = "ndbd"; break;
408       case atrt_process::AP_NDB_API:  type = "ndbapi"; state = "stopped";break;
409       case atrt_process::AP_NDB_MGMD: type = "ndb_mgmd"; break;
410       case atrt_process::AP_MYSQLD:   type = "mysqld"; break;
411       case atrt_process::AP_CLIENT:   type = "mysql"; state = "stopped";break;
412       default:
413 	abort();
414       }
415 
416       BINDI(bind[0], &id);
417       BINDI(bind[1], &host_id);
418       BINDI(bind[2], &cluster_id);
419       BINDS(bind[3], type, &l0);
420       BINDS(bind[4], state, &l1);
421       BINDI(bind[5], &node_id);
422 
423       if (mysql_stmt_bind_param(stmt, bind))
424       {
425 	g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
426 	return false;
427       }
428 
429       if (mysql_stmt_execute(stmt))
430       {
431 	g_logger.error("3 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
432 	return false;
433       }
434 
435       if (populate_options(&mysqld->m_mysql, stmtopt, &option_id, id,
436 			   &proc->m_options.m_loaded) == false)
437 	return false;
438 
439       if (populate_options(&mysqld->m_mysql, stmtopt, &option_id, id,
440 			   &proc->m_cluster->m_options.m_loaded) == false)
441 	return false;
442 
443     }
444     mysql_stmt_close(stmt);
445     mysql_stmt_close(stmtopt);
446   }
447 
448   return true;
449 }
450 
451 static
452 bool
setup_repl(atrt_process * dst,atrt_process * src)453 setup_repl(atrt_process* dst, atrt_process* src)
454 {
455   if (!run_query(src, "STOP SLAVE"))
456   {
457     g_logger.error("Failed to stop slave: %s",
458 		   mysql_error(&src->m_mysql));
459     return false;
460   }
461 
462   if (!run_query(src, "RESET SLAVE"))
463   {
464     g_logger.error("Failed to reset slave: %s",
465 		   mysql_error(&src->m_mysql));
466     return false;
467   }
468 
469   BaseString tmp;
470   tmp.assfmt("CHANGE MASTER TO   "
471 	     " MASTER_HOST='%s', "
472 	     " MASTER_PORT=%u    ",
473 	     dst->m_host->m_hostname.c_str(),
474 	     atoi(find(dst, "--port=")));
475 
476   if (!run_query(src, tmp.c_str()))
477   {
478     g_logger.error("Failed to setup repl from %s to %s: %s",
479 		   src->m_host->m_hostname.c_str(),
480 		   dst->m_host->m_hostname.c_str(),
481 		   mysql_error(&src->m_mysql));
482     return false;
483   }
484 
485   if (!run_query(src, "START SLAVE"))
486   {
487     g_logger.error("Failed to start slave: %s",
488 		   mysql_error(&src->m_mysql));
489     return false;
490   }
491 
492   g_logger.info("Replication from %s(%s) to %s(%s) setup",
493 		src->m_host->m_hostname.c_str(),
494 		src->m_cluster->m_name.c_str(),
495 		dst->m_host->m_hostname.c_str(),
496 		dst->m_cluster->m_name.c_str());
497 
498   return true;
499 }
500 
501 bool
setup_repl(atrt_config & config)502 setup_repl(atrt_config& config)
503 {
504   for (size_t i = 0; i<config.m_processes.size(); i++)
505   {
506     atrt_process * dst = config.m_processes[i];
507     if (dst->m_rep_src)
508     {
509       if (setup_repl(dst->m_rep_src, dst) != true)
510 	return false;
511     }
512   }
513   return true;
514 }
515 
516 template int find(atrt_host* obj, Vector<atrt_host*>& arr);
517 template int find(atrt_cluster* obj, Vector<atrt_cluster*>& arr);
518 
519