1 /*
2  Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 #include <my_config.h>
27 #include <stdio.h>
28 #include <pthread.h>
29 
30 #include <memcached/extension_loggers.h>
31 
32 #include <NdbApi.hpp>
33 
34 #include "Configuration.h"
35 #include "ndb_configuration.h"
36 #include "debug.h"
37 #include "workitem.h"
38 #include "NdbInstance.h"
39 #include "ndb_pipeline.h"
40 #include "thread_identifier.h"
41 #include "Scheduler.h"
42 #include "ExternalValue.h"
43 
44 /* A static global variable */
45 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
46 
47 /* From ndb_pipeline */
48 extern int workitem_class_id;
49 extern int workitem_actual_inline_buffer_size;
50 
51 
52 /* An external Function */
53 extern "C" {
54   void cache_set_initial_cas_id(uint64_t cas);    /* In cache-src/items.c */
55 }
56 
57 Configuration * active_config = 0;
58 Configuration * next_config = 0;
59 Configuration * stale_config = 0;
60 
61 /* This function has C++ linkage */
get_Configuration()62 Configuration & get_Configuration() {
63   return *active_config;
64 };
65 
66 
67 /* This function has C linkage */
connect_to_primary_cluster(const char * connectstring,const char * server_role)68 bool connect_to_primary_cluster(const char *connectstring,
69                                 const char *server_role) {
70   DEBUG_ENTER();
71   active_config = new Configuration;
72   active_config->setPrimaryConnectString(connectstring);
73   active_config->setServerRole(server_role);
74   return active_config->connectToPrimary();
75 }
76 
77 
read_configuration(Configuration * cf)78 bool read_configuration(Configuration *cf) {
79   const char *method[4] = {
80     "is ignored",
81     "uses NDB only",
82     "uses local cache only",
83     "uses NDB with local cache"
84   };
85   int npref;
86   int log_msg_sz = 0;
87   const int log_buf_sz = 2048;
88   char logmsg[log_buf_sz];
89 
90   if(cf->readConfiguration()) {
91     const KeyPrefix *p = cf->getDefaultPrefix();
92     npref = cf->nprefixes;
93     unsigned pGET = (p->info.do_mc_read * 2) + p->info.do_db_read;
94     unsigned pSET = (p->info.do_mc_write * 2) + p->info.do_db_write;
95     unsigned pDEL = (p->info.do_mc_delete * 2) + p->info.do_db_delete;
96 
97     logger->log(LOG_WARNING, NULL,
98                 "Retrieved %d key prefix%s for server role \"%s\".\n"
99                 "The default behavior is that: \n"
100                 "    GET %s\n    SET %s\n    DELETE %s.\n",
101                 cf->nprefixes,
102                 cf->nprefixes == 1 ? "" : "es", cf->getServerRole(),
103                 method[pGET], method[pSET], method[pDEL]);
104 
105     if(npref > 1) {  /* List all non-default prefixes */
106       log_msg_sz = snprintf(logmsg, log_buf_sz - log_msg_sz,
107                             "The %d explicitly defined key prefix%s ",
108                             npref - 1, npref == 2 ? " is" : "es are");
109       for(int i = 1 ; i < npref ; i ++) {
110         log_msg_sz += snprintf(logmsg + log_msg_sz, log_buf_sz - log_msg_sz,
111                               "%s\"%s\" (%s)",
112                               i > 1 ? (i == npref - 1 ? " and " : ", ") : "",
113                               cf->getPrefix(i)->prefix,
114                               cf->getPrefix(i)->table ?
115                               cf->getPrefix(i)->table->table_name : "");
116       }
117       snprintf(logmsg + log_msg_sz, log_buf_sz - log_msg_sz, "\n");
118       logger->log(LOG_WARNING, NULL, logmsg);
119     }
120     return true;
121   }
122   return false;
123 }
124 
125 
126 /* This function has C linkage */
get_config()127 bool get_config() {
128   return read_configuration(active_config);
129 }
130 
131 
132 /* This function has C linkage */
open_connections_to_all_clusters()133 bool open_connections_to_all_clusters() {
134   return active_config->openAllConnections();
135 }
136 
137 
138 /* This function has C linkage */
prefetch_dictionary_objects()139 bool prefetch_dictionary_objects() {
140   return active_config->prefetchDictionary();
141 }
142 
143 
144 /* This function has C linkage */
set_initial_cas_ids(unsigned int * hi,atomic_int32_t * lo)145 void set_initial_cas_ids(unsigned int *hi, atomic_int32_t *lo) {
146   /* Set the initial CAS for the default engine: */
147   /* XXXXX disabled.  Because we're linking with the actual default engine,
148      we don't have the opportunity to coordinate CAS IDs between the two
149      engines. */
150   // cache_set_initial_cas_id(active_config->initial_cas.for_default_engine);
151 
152   /* Set the initial CAS for the NDB engine: */
153   *hi = active_config->initial_cas.for_ndb_engine >> 32;
154   *lo = active_config->initial_cas.for_ndb_engine & 0xFFFFFFFF;
155 }
156 
157 /* This function has C linkage */
get_prefix_info_for_key(int nkey,const char * key)158 prefix_info_t get_prefix_info_for_key(int nkey, const char *key) {
159   const KeyPrefix *prefix = active_config->getPrefixForKey(key, nkey);
160   return prefix->info;
161 }
162 
163 
164 /* This function has C linkage */
disconnect_all()165 void disconnect_all() {
166   /* Run only at shutdown time.  Disabled to silence a warning about
167      "Deleting Ndb_cluster_connection with Ndb-object not deleted" */
168   // active_config->disconnectAll();
169 }
170 
171 
172 /* This function has C linkage */
print_debug_startup_info()173 void print_debug_startup_info() {
174 #ifdef DEBUG_OUTPUT
175   size_t wi1 = 1 << workitem_class_id;
176   size_t wi2 = sizeof(workitem) - WORKITEM_MIN_INLINE_BUF;
177   size_t wi3 = workitem_actual_inline_buffer_size;
178   DEBUG_PRINT("  sizeof Ndb           : %lu", sizeof(Ndb));
179   DEBUG_PRINT("  sizeof NdbInstance   : %lu", sizeof(NdbInstance));
180   DEBUG_PRINT("  sizeof workitem      : %lu (%lu + buffer: %lu)", wi1, wi2, wi3);
181   DEBUG_PRINT("  sizeof ExternalValue : %lu", sizeof(ExternalValue));
182 #endif
183 }
184 
185 
reconfigure(Scheduler * s)186 void reconfigure(Scheduler *s) {
187   DEBUG_ENTER();
188 
189   next_config = new Configuration(active_config);
190 
191   if(! read_configuration(next_config)) {
192     logger->log(LOG_WARNING, 0, "Online reconfiguration failed.");
193   }
194   else if(! s->global_reconfigure(next_config)) {
195     logger->log(LOG_WARNING, 0,
196                 "Online configuration aborted -- not supported by scheduler.");
197   }
198   else {
199     /* There is no garbage collection here, but there could be if Configuration
200        had a carefully-written destructor. */
201     stale_config = active_config;
202     active_config = next_config;
203     next_config = 0;
204     logger->log(LOG_WARNING, 0, "ONLINE RECONFIGURATION COMPLETE");
205   }
206 }
207 
208 
209 extern "C" {
210   void * run_reconfig_listener_thread(void *);
211 }
212 
213 // TODO: This could take a GlobalConfigManager rather than a Scheduler
run_reconfig_listener_thread(void * p)214 void * run_reconfig_listener_thread(void *p) {
215   thread_identifier tid;
216   tid.pipeline = 0;
217   strcpy(tid.name,"config_listener");
218   set_thread_id(&tid);
219 
220   DEBUG_ENTER();
221   ndb_pipeline * pipeline = (ndb_pipeline *) p;
222 
223   while(1) {
224     int i = active_config->waitForReconfSignal();
225 
226     if(i == 0) {
227       DEBUG_PRINT("will listen again.");
228     }
229     else if(i == 1) {
230       DEBUG_PRINT("reconfiguring");
231       reconfigure(pipeline->scheduler);
232     }
233     else {
234       DEBUG_PRINT("error (%d); exiting.", i);
235       break;
236     }
237   }
238   return 0;
239 }
240 
241 
242 /* This function has C linkage */
start_reconfig_listener(void * pipeline)243 void start_reconfig_listener(void *pipeline) {
244   DEBUG_ENTER();
245   if(active_config->canReloadOnline()) {
246     pthread_t thd_id;
247 
248     DEBUG_PRINT("Starting thread.");
249     pthread_create(& thd_id, NULL, run_reconfig_listener_thread, pipeline);
250   }
251   else {
252     DEBUG_PRINT("Not supported.");
253   }
254 }
255