1 /*
2    Copyright (C) 2008 MySQL AB, 2008-2010 Sun Microsystems, Inc.
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 
27 #include "atrt.hpp"
28 #include <AtrtClient.hpp>
29 
30 
31 
find_atrtdb_client(atrt_config & config)32 MYSQL* find_atrtdb_client(atrt_config& config)
33 {
34   atrt_cluster* cluster = 0;
35   for (size_t i = 0; i<config.m_clusters.size(); i++)
36   {
37     if (strcmp(config.m_clusters[i]->m_name.c_str(), ".atrt") == 0)
38     {
39       cluster = config.m_clusters[i];
40 
41       for (size_t i = 0; i<cluster->m_processes.size(); i++)
42       {
43         if (cluster->m_processes[i]->m_type == atrt_process::AP_CLIENT)
44         {
45           atrt_process* atrt_client= cluster->m_processes[i];
46           if (!atrt_client)
47             return NULL; /* No atrt db */
48 
49           atrt_process* f_mysqld = atrt_client->m_mysqld;
50           assert(f_mysqld);
51 
52           return &f_mysqld->m_mysql;
53         }
54       }
55       break;
56     }
57   }
58   return NULL;
59 }
60 
61 
62 
63 static bool
ack_command(AtrtClient & atrtdb,int command_id,const char * state)64 ack_command(AtrtClient& atrtdb, int command_id, const char* state)
65 {
66   BaseString sql;
67   sql.assfmt("UPDATE command SET state = '%s' WHERE id = %d",
68              state, command_id);
69   return atrtdb.doQuery(sql);
70 }
71 
72 
73 BaseString
set_env_var(const BaseString & existing,const BaseString & name,const BaseString & value)74 set_env_var(const BaseString& existing,
75             const BaseString& name,
76             const BaseString& value)
77 {
78   /* Split existing on space
79    * (may have issues with env vars with spaces)
80    * Split assignments on =
81    * Where name == name, output new value
82    */
83   BaseString newEnv;
84   Vector<BaseString> assignments;
85   int assignmentCount = existing.split(assignments, BaseString(" "));
86 
87   for (int i=0; i < assignmentCount; i++)
88   {
89     Vector<BaseString> terms;
90     int termCount = assignments[i].split(terms, BaseString("="));
91 
92     if (termCount)
93     {
94       if (strcmp(name.c_str(), terms[0].c_str()) == 0)
95       {
96         /* Found element */
97         newEnv.append(name);
98         newEnv.append('=');
99         newEnv.append(value);
100       }
101       else
102       {
103         newEnv.append(assignments[i]);
104       }
105     }
106     newEnv.append(' ');
107   }
108 
109   return newEnv;
110 }
111 
112 
113 Vector<atrt_process> g_saved_procs;
114 
115 static
116 bool
do_change_version(atrt_config & config,SqlResultSet & command,AtrtClient & atrtdb)117 do_change_version(atrt_config& config, SqlResultSet& command,
118                   AtrtClient& atrtdb){
119   /**
120    * TODO make option to restart "not" initial
121    */
122   uint process_id= command.columnAsInt("process_id");
123   const char* process_args= command.column("process_args");
124 
125   g_logger.info("Change version for process: %d, args: %s",
126                 process_id, process_args);
127 
128   // Get the process
129   if (process_id > config.m_processes.size()){
130     g_logger.critical("Invalid process id %d", process_id);
131     return false;
132   }
133   atrt_process& proc= *config.m_processes[process_id];
134 
135   const char* new_prefix= g_prefix1 ? g_prefix1 : g_prefix;
136   const char* old_prefix= g_prefix;
137   const char *start= strstr(proc.m_proc.m_path.c_str(), old_prefix);
138   if (!start){
139     /* Process path does not contain old prefix.
140      * Perhaps it contains the new prefix - e.g. is already
141      * upgraded?
142      */
143     if (strstr(proc.m_proc.m_path.c_str(), new_prefix))
144     {
145       /* Process is already upgraded, *assume* that this
146        * is ok
147        * Alternatives could be - error, or downgrade.
148        */
149       g_logger.info("Process already upgraded");
150       return true;
151     }
152 
153     g_logger.critical("Could not find '%s' in '%s'",
154                       old_prefix, proc.m_proc.m_path.c_str());
155     return false;
156   }
157 
158   // Save current proc state
159   if (proc.m_save.m_saved == false)
160   {
161     proc.m_save.m_proc= proc.m_proc;
162     proc.m_save.m_saved= true;
163   }
164 
165   g_logger.info("stopping process...");
166   if (!stop_process(proc))
167     return false;
168   BaseString newEnv = set_env_var(proc.m_proc.m_env,
169                                   BaseString("MYSQL_BASE_DIR"),
170                                   BaseString(new_prefix));
171   proc.m_proc.m_env.assign(newEnv);
172   BaseString suffix(proc.m_proc.m_path.substr(strlen(old_prefix)));
173   proc.m_proc.m_path.assign(new_prefix).append(suffix);
174   if (process_args && strlen(process_args))
175   {
176     /* Beware too long args */
177     proc.m_proc.m_args.append(" ");
178     proc.m_proc.m_args.append(process_args);
179   }
180 
181   ndbout << proc << endl;
182 
183   g_logger.info("starting process...");
184   if (!start_process(proc))
185     return false;
186   return true;
187 }
188 
189 
190 static
191 bool
do_reset_proc(atrt_config & config,SqlResultSet & command,AtrtClient & atrtdb)192 do_reset_proc(atrt_config& config, SqlResultSet& command,
193                AtrtClient& atrtdb){
194   uint process_id= command.columnAsInt("process_id");
195   g_logger.info("Reset process: %d", process_id);
196 
197   // Get the process
198   if (process_id > config.m_processes.size()){
199     g_logger.critical("Invalid process id %d", process_id);
200     return false;
201   }
202   atrt_process& proc= *config.m_processes[process_id];
203 
204   g_logger.info("stopping process...");
205   if (!stop_process(proc))
206     return false;
207 
208   if (proc.m_save.m_saved)
209   {
210     ndbout << "before: " << proc << endl;
211 
212     proc.m_proc= proc.m_save.m_proc;
213     proc.m_save.m_saved= false;
214     proc.m_proc.m_id= -1;
215 
216     ndbout << "after: " << proc << endl;
217 
218   }
219   else
220   {
221     ndbout << "process has not changed" << endl;
222   }
223 
224   g_logger.info("starting process...");
225   if (!start_process(proc))
226     return false;
227   return true;
228 }
229 
230 
231 bool
do_command(atrt_config & config)232 do_command(atrt_config& config){
233 
234 #ifdef _WIN32
235   return true;
236 #endif
237 
238   MYSQL* mysql= find_atrtdb_client(config);
239   if (!mysql)
240     return true;
241 
242   AtrtClient atrtdb(mysql);
243   SqlResultSet command;
244   if (!atrtdb.doQuery("SELECT * FROM command " \
245                      "WHERE state = 'new' ORDER BY id LIMIT 1", command)){
246     g_logger.critical("query failed");
247     return false;
248   }
249 
250   if (command.numRows() == 0)
251     return true;
252 
253   uint id= command.columnAsInt("id");
254   uint cmd= command.columnAsInt("cmd");
255   g_logger.info("Got command, id: %d, cmd: %d", id, cmd);
256   // command.print();
257 
258   // Set state of command to running
259   if (!ack_command(atrtdb, id, "running"))
260     return false;
261 
262   switch (cmd){
263   case AtrtClient::ATCT_CHANGE_VERSION:
264     if (!do_change_version(config, command, atrtdb))
265       return false;
266     break;
267 
268   case AtrtClient::ATCT_RESET_PROC:
269     if (!do_reset_proc(config, command, atrtdb))
270       return false;
271     break;
272 
273   default:
274     command.print();
275     g_logger.error("got unknown command: %d", cmd);
276     return false;
277   }
278 
279   // Set state of command to done
280   if (!ack_command(atrtdb, id, "done"))
281     return false;
282 
283   g_logger.info("done!");
284 
285   return true;
286 }
287 
288 
289 template class Vector<atrt_process>;
290