1 /*
2 Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights
3 reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24 02110-1301 USA
25 */
26 #include <my_config.h>
27 #include <stdio.h>
28 #include <pthread.h>
29
30 #include <memcached/extension_loggers.h>
31
32 #include <NdbApi.hpp>
33
34 #include "Configuration.h"
35 #include "ndb_configuration.h"
36 #include "debug.h"
37 #include "workitem.h"
38 #include "NdbInstance.h"
39 #include "ndb_pipeline.h"
40 #include "thread_identifier.h"
41 #include "Scheduler.h"
42 #include "ExternalValue.h"
43
44 /* A static global variable */
45 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
46
47 /* From ndb_pipeline */
48 extern int workitem_class_id;
49 extern int workitem_actual_inline_buffer_size;
50
51
52 /* An external Function */
53 extern "C" {
54 void cache_set_initial_cas_id(uint64_t cas); /* In cache-src/items.c */
55 }
56
57 Configuration * active_config = 0;
58 Configuration * next_config = 0;
59 Configuration * stale_config = 0;
60
61 /* This function has C++ linkage */
get_Configuration()62 Configuration & get_Configuration() {
63 return *active_config;
64 };
65
66
67 /* This function has C linkage */
connect_to_primary_cluster(const char * connectstring,const char * server_role)68 bool connect_to_primary_cluster(const char *connectstring,
69 const char *server_role) {
70 DEBUG_ENTER();
71 active_config = new Configuration;
72 active_config->setPrimaryConnectString(connectstring);
73 active_config->setServerRole(server_role);
74 return active_config->connectToPrimary();
75 }
76
77
read_configuration(Configuration * cf)78 bool read_configuration(Configuration *cf) {
79 const char *method[4] = {
80 "is ignored",
81 "uses NDB only",
82 "uses local cache only",
83 "uses NDB with local cache"
84 };
85 int npref;
86 int log_msg_sz = 0;
87 const int log_buf_sz = 2048;
88 char logmsg[log_buf_sz];
89
90 if(cf->readConfiguration()) {
91 const KeyPrefix *p = cf->getDefaultPrefix();
92 npref = cf->nprefixes;
93 unsigned pGET = (p->info.do_mc_read * 2) + p->info.do_db_read;
94 unsigned pSET = (p->info.do_mc_write * 2) + p->info.do_db_write;
95 unsigned pDEL = (p->info.do_mc_delete * 2) + p->info.do_db_delete;
96
97 logger->log(LOG_WARNING, NULL,
98 "Retrieved %d key prefix%s for server role \"%s\".\n"
99 "The default behavior is that: \n"
100 " GET %s\n SET %s\n DELETE %s.\n",
101 cf->nprefixes,
102 cf->nprefixes == 1 ? "" : "es", cf->getServerRole(),
103 method[pGET], method[pSET], method[pDEL]);
104
105 if(npref > 1) { /* List all non-default prefixes */
106 log_msg_sz = snprintf(logmsg, log_buf_sz - log_msg_sz,
107 "The %d explicitly defined key prefix%s ",
108 npref - 1, npref == 2 ? " is" : "es are");
109 for(int i = 1 ; i < npref ; i ++) {
110 log_msg_sz += snprintf(logmsg + log_msg_sz, log_buf_sz - log_msg_sz,
111 "%s\"%s\" (%s)",
112 i > 1 ? (i == npref - 1 ? " and " : ", ") : "",
113 cf->getPrefix(i)->prefix,
114 cf->getPrefix(i)->table ?
115 cf->getPrefix(i)->table->table_name : "");
116 }
117 snprintf(logmsg + log_msg_sz, log_buf_sz - log_msg_sz, "\n");
118 logger->log(LOG_WARNING, NULL, logmsg);
119 }
120 return true;
121 }
122 return false;
123 }
124
125
126 /* This function has C linkage */
get_config()127 bool get_config() {
128 return read_configuration(active_config);
129 }
130
131
132 /* This function has C linkage */
open_connections_to_all_clusters()133 bool open_connections_to_all_clusters() {
134 return active_config->openAllConnections();
135 }
136
137
138 /* This function has C linkage */
prefetch_dictionary_objects()139 bool prefetch_dictionary_objects() {
140 return active_config->prefetchDictionary();
141 }
142
143
144 /* This function has C linkage */
set_initial_cas_ids(unsigned int * hi,atomic_int32_t * lo)145 void set_initial_cas_ids(unsigned int *hi, atomic_int32_t *lo) {
146 /* Set the initial CAS for the default engine: */
147 /* XXXXX disabled. Because we're linking with the actual default engine,
148 we don't have the opportunity to coordinate CAS IDs between the two
149 engines. */
150 // cache_set_initial_cas_id(active_config->initial_cas.for_default_engine);
151
152 /* Set the initial CAS for the NDB engine: */
153 *hi = active_config->initial_cas.for_ndb_engine >> 32;
154 *lo = active_config->initial_cas.for_ndb_engine & 0xFFFFFFFF;
155 }
156
157 /* This function has C linkage */
get_prefix_info_for_key(int nkey,const char * key)158 prefix_info_t get_prefix_info_for_key(int nkey, const char *key) {
159 const KeyPrefix *prefix = active_config->getPrefixForKey(key, nkey);
160 return prefix->info;
161 }
162
163
164 /* This function has C linkage */
disconnect_all()165 void disconnect_all() {
166 /* Run only at shutdown time. Disabled to silence a warning about
167 "Deleting Ndb_cluster_connection with Ndb-object not deleted" */
168 // active_config->disconnectAll();
169 }
170
171
172 /* This function has C linkage */
print_debug_startup_info()173 void print_debug_startup_info() {
174 #ifdef DEBUG_OUTPUT
175 size_t wi1 = 1 << workitem_class_id;
176 size_t wi2 = sizeof(workitem) - WORKITEM_MIN_INLINE_BUF;
177 size_t wi3 = workitem_actual_inline_buffer_size;
178 DEBUG_PRINT(" sizeof Ndb : %lu", sizeof(Ndb));
179 DEBUG_PRINT(" sizeof NdbInstance : %lu", sizeof(NdbInstance));
180 DEBUG_PRINT(" sizeof workitem : %lu (%lu + buffer: %lu)", wi1, wi2, wi3);
181 DEBUG_PRINT(" sizeof ExternalValue : %lu", sizeof(ExternalValue));
182 #endif
183 }
184
185
reconfigure(Scheduler * s)186 void reconfigure(Scheduler *s) {
187 DEBUG_ENTER();
188
189 next_config = new Configuration(active_config);
190
191 if(! read_configuration(next_config)) {
192 logger->log(LOG_WARNING, 0, "Online reconfiguration failed.");
193 }
194 else if(! s->global_reconfigure(next_config)) {
195 logger->log(LOG_WARNING, 0,
196 "Online configuration aborted -- not supported by scheduler.");
197 }
198 else {
199 /* There is no garbage collection here, but there could be if Configuration
200 had a carefully-written destructor. */
201 stale_config = active_config;
202 active_config = next_config;
203 next_config = 0;
204 logger->log(LOG_WARNING, 0, "ONLINE RECONFIGURATION COMPLETE");
205 }
206 }
207
208
209 extern "C" {
210 void * run_reconfig_listener_thread(void *);
211 }
212
213 // TODO: This could take a GlobalConfigManager rather than a Scheduler
run_reconfig_listener_thread(void * p)214 void * run_reconfig_listener_thread(void *p) {
215 thread_identifier tid;
216 tid.pipeline = 0;
217 strcpy(tid.name,"config_listener");
218 set_thread_id(&tid);
219
220 DEBUG_ENTER();
221 ndb_pipeline * pipeline = (ndb_pipeline *) p;
222
223 while(1) {
224 int i = active_config->waitForReconfSignal();
225
226 if(i == 0) {
227 DEBUG_PRINT("will listen again.");
228 }
229 else if(i == 1) {
230 DEBUG_PRINT("reconfiguring");
231 reconfigure(pipeline->scheduler);
232 }
233 else {
234 DEBUG_PRINT("error (%d); exiting.", i);
235 break;
236 }
237 }
238 return 0;
239 }
240
241
242 /* This function has C linkage */
start_reconfig_listener(void * pipeline)243 void start_reconfig_listener(void *pipeline) {
244 DEBUG_ENTER();
245 if(active_config->canReloadOnline()) {
246 pthread_t thd_id;
247
248 DEBUG_PRINT("Starting thread.");
249 pthread_create(& thd_id, NULL, run_reconfig_listener_thread, pipeline);
250 }
251 else {
252 DEBUG_PRINT("Not supported.");
253 }
254 }
255