1 /* 2 Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights 3 reserved. 4 5 This program is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License, version 2.0, 7 as published by the Free Software Foundation. 8 9 This program is also distributed with certain software (including 10 but not limited to OpenSSL) that is licensed under separate terms, 11 as designated in a particular file or component or in included license 12 documentation. The authors of MySQL hereby grant you an additional 13 permission to link the program and your derivative works with the 14 separately licensed software that they have included with MySQL. 15 16 This program is distributed in the hope that it will be useful, 17 but WITHOUT ANY WARRANTY; without even the implied warranty of 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 GNU General Public License, version 2.0, for more details. 20 21 You should have received a copy of the GNU General Public License 22 along with this program; if not, write to the Free Software 23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 24 02110-1301 USA 25 */ 26 27 #ifndef NDBMEMCACHE_S_SCHEDULER_H 28 #define NDBMEMCACHE_S_SCHEDULER_H 29 30 #ifndef __cplusplus 31 #error "This file is for C++ only" 32 #endif 33 34 #include <memcached/types.h> 35 36 #include <NdbWaitGroup.hpp> 37 38 #include "ndbmemcache_config.h" 39 #include "Scheduler.h" 40 #include "Queue.h" 41 #include "GlobalConfigManager.h" 42 43 /* 44 * 45 * This scheduler uses many Ndb objects and runs in three threads: 46 * the memcache worker thread prepares transactions 47 * the send thread sends them 48 * the poll thread waits for them to complete and then polls them. 49 * 50 * Class S::SchedulerWorker implents the Scheduler interface 51 */ 52 53 class S { 54 public: 55 class SchedulerGlobal; // a global singleton 56 class SchedulerWorker; // one object per memcached worker thread 57 class Cluster; // one object for each cluster 58 class Connection; // one object per connection to a cluster 59 class WorkerConnection; // one object per {worker,connection} pair 60 }; 61 62 63 /* The SchedulerGlobal singleton 64 */ 65 class S::SchedulerGlobal : public GlobalConfigManager { 66 friend class S::Cluster; 67 friend class S::Connection; 68 69 public: 70 SchedulerGlobal(int); ~SchedulerGlobal()71 ~SchedulerGlobal() {}; 72 void init(const scheduler_options *options); 73 void add_stats(const char *, ADD_STAT, const void *); 74 void shutdown(); getWorkerConnectionPtr(int thd,int cluster)75 WorkerConnection ** getWorkerConnectionPtr(int thd, int cluster) const { 76 return (WorkerConnection **) getSchedulerConfigManagerPtr(thd, cluster); 77 } 78 79 const char * config_string; 80 Cluster ** clusters; 81 82 struct { 83 int n_worker_threads; /** number of memcached worker threads */ 84 int n_connections; /** preferred number of NDB cluster connections */ 85 int force_send; /** how to use NDB force-send */ 86 int send_timer; /** milliseconds to set for adaptive send timer */ 87 int auto_grow; /** whether to allow NDB instance pool to grow */ 88 int max_clients; /** memcached max allowed connections */ 89 } options; 90 91 private: 92 void parse_config_string(int threads, const char *config_string); 93 bool running; 94 }; 95 96 97 /* S::SchedulerWorker implements the Scheduler interface. 98 There will be one SchedulerWorker for each memcached worker thread, 99 and attached to each NDB request pipeline. 100 */ 101 class S::SchedulerWorker : public Scheduler { 102 public: SchedulerWorker()103 SchedulerWorker() {}; 104 ~SchedulerWorker(); 105 void init(int threadnum, const scheduler_options * sched_opts); attach_thread(thread_identifier *)106 void attach_thread(thread_identifier *) {}; 107 ENGINE_ERROR_CODE schedule(workitem *); 108 void prepare(NdbTransaction *, NdbTransaction::ExecType, 109 NdbAsynchCallback, workitem *, prepare_flags); 110 void close(NdbTransaction *, workitem *); 111 void release(workitem *); 112 void add_stats(const char *, ADD_STAT, const void *); 113 void shutdown(); 114 bool global_reconfigure(Configuration *); 115 116 private: 117 int id; 118 ndb_pipeline *pipeline; 119 SchedulerGlobal * m_global; 120 }; 121 122 123 /* For each connected cluster, there is one S::Cluster 124 */ 125 class S::Cluster { 126 public: 127 Cluster(SchedulerGlobal *, int id); 128 ~Cluster(); 129 void add_stats(const char *, ADD_STAT, const void *); 130 WorkerConnection ** getWorkerConnectionPtr(int thd) const; 131 void startThreads(); 132 133 bool threads_started; 134 int cluster_id; 135 int nconnections; 136 int nreferences; 137 Connection ** connections; 138 }; 139 140 141 /* For each Ndb_cluster_connection, there is one instance of Connection, 142 which runs a send thread and a poll thread. 143 */ 144 class S::Connection { 145 friend class S::SchedulerWorker; 146 friend class S::WorkerConnection; 147 148 public: 149 Connection(Cluster &, int connection_id); 150 ~Connection(); 151 void add_stats(const char *, ADD_STAT, const void *); 152 void startThreads(); 153 154 /* These are not intended to be part of the public API, but are marked as 155 public so that they can be called from C code in pthread_create(): */ 156 void * run_ndb_send_thread(); 157 void * run_ndb_poll_thread(); 158 159 private: 160 const Cluster & cluster; 161 Ndb_cluster_connection * conn; 162 NdbWaitGroup *pollgroup; 163 Queue<NdbInstance> * sentqueue; 164 Queue<NdbInstance> * reschedulequeue; 165 int id; 166 int node_id; 167 int n_total_workers; /* same as SchedulerGlobal::options.n_worker_threads */ 168 int n_workers; /* number of workers for this connection */ 169 struct { 170 int initial; /* start with this many NDB instances */ 171 int max; /* scale up to this many */ 172 } instances; 173 pthread_t send_thread_id; 174 pthread_t poll_thread_id; 175 struct { 176 pthread_mutex_t lock; 177 pthread_cond_t not_zero; 178 unsigned int counter; 179 } sem; 180 struct { 181 Uint64 sent_operations; 182 Uint64 batches; 183 Uint64 timeout_races; 184 } stats; 185 186 int get_operations_from_queue(NdbInstance **readylist, Queue<NdbInstance> *q); 187 }; 188 189 190 /* For each {connection, worker} tuple there is a WorkerConnection 191 */ 192 class S::WorkerConnection : public SchedulerConfigManager { 193 public: 194 WorkerConnection(SchedulerGlobal *, int thd_id, int cluster_id); 195 ~WorkerConnection(); 196 void shutdown(); 197 void reconfigure(Configuration *); 198 NdbInstance * newNdbInstance(); 199 200 struct { 201 int thd : 8; 202 int cluster : 8; 203 int conn : 8; 204 unsigned int node : 8; 205 } id; 206 struct { 207 int initial; 208 int current; 209 int max; 210 } instances; 211 S::Connection *conn; 212 NdbInstance *freelist; 213 Queue<NdbInstance> * sendqueue; 214 }; 215 216 #endif 217