1 /*
2 Copyright (C) 2008 MySQL AB, 2008-2010 Sun Microsystems, Inc.
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26
27 #include "atrt.hpp"
28 #include <AtrtClient.hpp>
29
30
31
find_atrtdb_client(atrt_config & config)32 MYSQL* find_atrtdb_client(atrt_config& config)
33 {
34 atrt_cluster* cluster = 0;
35 for (size_t i = 0; i<config.m_clusters.size(); i++)
36 {
37 if (strcmp(config.m_clusters[i]->m_name.c_str(), ".atrt") == 0)
38 {
39 cluster = config.m_clusters[i];
40
41 for (size_t i = 0; i<cluster->m_processes.size(); i++)
42 {
43 if (cluster->m_processes[i]->m_type == atrt_process::AP_CLIENT)
44 {
45 atrt_process* atrt_client= cluster->m_processes[i];
46 if (!atrt_client)
47 return NULL; /* No atrt db */
48
49 atrt_process* f_mysqld = atrt_client->m_mysqld;
50 assert(f_mysqld);
51
52 return &f_mysqld->m_mysql;
53 }
54 }
55 break;
56 }
57 }
58 return NULL;
59 }
60
61
62
63 static bool
ack_command(AtrtClient & atrtdb,int command_id,const char * state)64 ack_command(AtrtClient& atrtdb, int command_id, const char* state)
65 {
66 BaseString sql;
67 sql.assfmt("UPDATE command SET state = '%s' WHERE id = %d",
68 state, command_id);
69 return atrtdb.doQuery(sql);
70 }
71
72
73 BaseString
set_env_var(const BaseString & existing,const BaseString & name,const BaseString & value)74 set_env_var(const BaseString& existing,
75 const BaseString& name,
76 const BaseString& value)
77 {
78 /* Split existing on space
79 * (may have issues with env vars with spaces)
80 * Split assignments on =
81 * Where name == name, output new value
82 */
83 BaseString newEnv;
84 Vector<BaseString> assignments;
85 int assignmentCount = existing.split(assignments, BaseString(" "));
86
87 for (int i=0; i < assignmentCount; i++)
88 {
89 Vector<BaseString> terms;
90 int termCount = assignments[i].split(terms, BaseString("="));
91
92 if (termCount)
93 {
94 if (strcmp(name.c_str(), terms[0].c_str()) == 0)
95 {
96 /* Found element */
97 newEnv.append(name);
98 newEnv.append('=');
99 newEnv.append(value);
100 }
101 else
102 {
103 newEnv.append(assignments[i]);
104 }
105 }
106 newEnv.append(' ');
107 }
108
109 return newEnv;
110 }
111
112
113 Vector<atrt_process> g_saved_procs;
114
115 static
116 bool
do_change_version(atrt_config & config,SqlResultSet & command,AtrtClient & atrtdb)117 do_change_version(atrt_config& config, SqlResultSet& command,
118 AtrtClient& atrtdb){
119 /**
120 * TODO make option to restart "not" initial
121 */
122 uint process_id= command.columnAsInt("process_id");
123 const char* process_args= command.column("process_args");
124
125 g_logger.info("Change version for process: %d, args: %s",
126 process_id, process_args);
127
128 // Get the process
129 if (process_id > config.m_processes.size()){
130 g_logger.critical("Invalid process id %d", process_id);
131 return false;
132 }
133 atrt_process& proc= *config.m_processes[process_id];
134
135 const char* new_prefix= g_prefix1 ? g_prefix1 : g_prefix;
136 const char* old_prefix= g_prefix;
137 const char *start= strstr(proc.m_proc.m_path.c_str(), old_prefix);
138 if (!start){
139 /* Process path does not contain old prefix.
140 * Perhaps it contains the new prefix - e.g. is already
141 * upgraded?
142 */
143 if (strstr(proc.m_proc.m_path.c_str(), new_prefix))
144 {
145 /* Process is already upgraded, *assume* that this
146 * is ok
147 * Alternatives could be - error, or downgrade.
148 */
149 g_logger.info("Process already upgraded");
150 return true;
151 }
152
153 g_logger.critical("Could not find '%s' in '%s'",
154 old_prefix, proc.m_proc.m_path.c_str());
155 return false;
156 }
157
158 // Save current proc state
159 if (proc.m_save.m_saved == false)
160 {
161 proc.m_save.m_proc= proc.m_proc;
162 proc.m_save.m_saved= true;
163 }
164
165 g_logger.info("stopping process...");
166 if (!stop_process(proc))
167 return false;
168 BaseString newEnv = set_env_var(proc.m_proc.m_env,
169 BaseString("MYSQL_BASE_DIR"),
170 BaseString(new_prefix));
171 proc.m_proc.m_env.assign(newEnv);
172 BaseString suffix(proc.m_proc.m_path.substr(strlen(old_prefix)));
173 proc.m_proc.m_path.assign(new_prefix).append(suffix);
174 if (process_args && strlen(process_args))
175 {
176 /* Beware too long args */
177 proc.m_proc.m_args.append(" ");
178 proc.m_proc.m_args.append(process_args);
179 }
180
181 ndbout << proc << endl;
182
183 g_logger.info("starting process...");
184 if (!start_process(proc))
185 return false;
186 return true;
187 }
188
189
190 static
191 bool
do_reset_proc(atrt_config & config,SqlResultSet & command,AtrtClient & atrtdb)192 do_reset_proc(atrt_config& config, SqlResultSet& command,
193 AtrtClient& atrtdb){
194 uint process_id= command.columnAsInt("process_id");
195 g_logger.info("Reset process: %d", process_id);
196
197 // Get the process
198 if (process_id > config.m_processes.size()){
199 g_logger.critical("Invalid process id %d", process_id);
200 return false;
201 }
202 atrt_process& proc= *config.m_processes[process_id];
203
204 g_logger.info("stopping process...");
205 if (!stop_process(proc))
206 return false;
207
208 if (proc.m_save.m_saved)
209 {
210 ndbout << "before: " << proc << endl;
211
212 proc.m_proc= proc.m_save.m_proc;
213 proc.m_save.m_saved= false;
214 proc.m_proc.m_id= -1;
215
216 ndbout << "after: " << proc << endl;
217
218 }
219 else
220 {
221 ndbout << "process has not changed" << endl;
222 }
223
224 g_logger.info("starting process...");
225 if (!start_process(proc))
226 return false;
227 return true;
228 }
229
230
231 bool
do_command(atrt_config & config)232 do_command(atrt_config& config){
233
234 #ifdef _WIN32
235 return true;
236 #endif
237
238 MYSQL* mysql= find_atrtdb_client(config);
239 if (!mysql)
240 return true;
241
242 AtrtClient atrtdb(mysql);
243 SqlResultSet command;
244 if (!atrtdb.doQuery("SELECT * FROM command " \
245 "WHERE state = 'new' ORDER BY id LIMIT 1", command)){
246 g_logger.critical("query failed");
247 return false;
248 }
249
250 if (command.numRows() == 0)
251 return true;
252
253 uint id= command.columnAsInt("id");
254 uint cmd= command.columnAsInt("cmd");
255 g_logger.info("Got command, id: %d, cmd: %d", id, cmd);
256 // command.print();
257
258 // Set state of command to running
259 if (!ack_command(atrtdb, id, "running"))
260 return false;
261
262 switch (cmd){
263 case AtrtClient::ATCT_CHANGE_VERSION:
264 if (!do_change_version(config, command, atrtdb))
265 return false;
266 break;
267
268 case AtrtClient::ATCT_RESET_PROC:
269 if (!do_reset_proc(config, command, atrtdb))
270 return false;
271 break;
272
273 default:
274 command.print();
275 g_logger.error("got unknown command: %d", cmd);
276 return false;
277 }
278
279 // Set state of command to done
280 if (!ack_command(atrtdb, id, "done"))
281 return false;
282
283 g_logger.info("done!");
284
285 return true;
286 }
287
288
289 template class Vector<atrt_process>;
290