1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 
27 #ifndef NDBMEMCACHE_S_SCHEDULER_H
28 #define NDBMEMCACHE_S_SCHEDULER_H
29 
30 #ifndef __cplusplus
31 #error "This file is for C++ only"
32 #endif
33 
34 #include <memcached/types.h>
35 
36 #include <NdbWaitGroup.hpp>
37 
38 #include "ndbmemcache_config.h"
39 #include "Scheduler.h"
40 #include "Queue.h"
41 #include "GlobalConfigManager.h"
42 
43 /*
44  *
45  * This scheduler uses many Ndb objects and runs in three threads:
46  *    the memcache worker thread prepares transactions
47  *    the send thread sends them
48  *    the poll thread waits for them to complete and then polls them.
49  *
50  *  Class S::SchedulerWorker implents the Scheduler interface
51  */
52 
53 class S {
54 public:
55   class SchedulerGlobal;     // a global singleton
56   class SchedulerWorker;     // one object per memcached worker thread
57   class Cluster;             // one object for each cluster
58   class Connection;          // one object per connection to a cluster
59   class WorkerConnection;    // one object per {worker,connection} pair
60 };
61 
62 
63 /* The SchedulerGlobal singleton
64 */
65 class S::SchedulerGlobal : public GlobalConfigManager {
66   friend class S::Cluster;
67   friend class S::Connection;
68 
69 public:
70   SchedulerGlobal(int);
~SchedulerGlobal()71   ~SchedulerGlobal() {};
72   void init(const scheduler_options *options);
73   void add_stats(const char *, ADD_STAT, const void *);
74   void shutdown();
getWorkerConnectionPtr(int thd,int cluster)75   WorkerConnection ** getWorkerConnectionPtr(int thd, int cluster) const {
76     return (WorkerConnection **) getSchedulerConfigManagerPtr(thd, cluster);
77   }
78 
79   const char * config_string;
80   Cluster ** clusters;
81 
82   struct {
83     int n_worker_threads;  /** number of memcached worker threads */
84     int n_connections;     /** preferred number of NDB cluster connections */
85     int force_send;        /** how to use NDB force-send */
86     int send_timer;        /** milliseconds to set for adaptive send timer */
87     int auto_grow;         /** whether to allow NDB instance pool to grow */
88     int max_clients;       /** memcached max allowed connections */
89   } options;
90 
91 private:
92   void parse_config_string(int threads, const char *config_string);
93   bool running;
94 };
95 
96 
97 /* S::SchedulerWorker implements the Scheduler interface.
98    There will be one SchedulerWorker for each memcached worker thread,
99    and attached to each NDB request pipeline.
100  */
101 class S::SchedulerWorker : public Scheduler {
102 public:
SchedulerWorker()103   SchedulerWorker() {};
104   ~SchedulerWorker();
105   void init(int threadnum, const scheduler_options * sched_opts);
attach_thread(thread_identifier *)106   void attach_thread(thread_identifier *) {};
107   ENGINE_ERROR_CODE schedule(workitem *);
108   void prepare(NdbTransaction *, NdbTransaction::ExecType,
109                NdbAsynchCallback, workitem *, prepare_flags);
110   void close(NdbTransaction *, workitem *);
111   void release(workitem *);
112   void add_stats(const char *, ADD_STAT, const void *);
113   void shutdown();
114   bool global_reconfigure(Configuration *);
115 
116 private:
117   int id;
118   ndb_pipeline *pipeline;
119   SchedulerGlobal * m_global;
120 };
121 
122 
123 /* For each connected cluster, there is one S::Cluster
124 */
125 class S::Cluster {
126 public:
127   Cluster(SchedulerGlobal *, int id);
128   ~Cluster();
129   void add_stats(const char *, ADD_STAT, const void *);
130   WorkerConnection ** getWorkerConnectionPtr(int thd) const;
131   void startThreads();
132 
133   bool threads_started;
134   int cluster_id;
135   int nconnections;
136   int nreferences;
137   Connection ** connections;
138 };
139 
140 
141 /* For each Ndb_cluster_connection, there is one instance of Connection,
142    which runs a send thread and a poll thread.
143 */
144 class S::Connection {
145   friend class S::SchedulerWorker;
146   friend class S::WorkerConnection;
147 
148 public:
149   Connection(Cluster &, int connection_id);
150   ~Connection();
151   void add_stats(const char *, ADD_STAT, const void *);
152   void startThreads();
153 
154   /* These are not intended to be part of the public API, but are marked as
155      public so that they can be called from C code in pthread_create(): */
156   void * run_ndb_send_thread();
157   void * run_ndb_poll_thread();
158 
159 private:
160   const Cluster & cluster;
161   Ndb_cluster_connection * conn;
162   NdbWaitGroup *pollgroup;
163   Queue<NdbInstance> * sentqueue;
164   Queue<NdbInstance> * reschedulequeue;
165   int id;
166   int node_id;
167   int n_total_workers;   /* same as SchedulerGlobal::options.n_worker_threads */
168   int n_workers;         /* number of workers for this connection */
169   struct {
170     int initial;         /* start with this many NDB instances */
171     int max;             /* scale up to this many */
172   } instances;
173   pthread_t send_thread_id;
174   pthread_t poll_thread_id;
175   struct {
176     pthread_mutex_t lock;
177     pthread_cond_t not_zero;
178     unsigned int counter;
179   } sem;
180   struct {
181     Uint64 sent_operations;
182     Uint64 batches;
183     Uint64 timeout_races;
184   } stats;
185 
186   int get_operations_from_queue(NdbInstance **readylist, Queue<NdbInstance> *q);
187 };
188 
189 
190 /* For each {connection, worker} tuple there is a WorkerConnection
191 */
192 class S::WorkerConnection : public SchedulerConfigManager {
193 public:
194   WorkerConnection(SchedulerGlobal *, int thd_id, int cluster_id);
195   ~WorkerConnection();
196   void shutdown();
197   void reconfigure(Configuration *);
198   NdbInstance * newNdbInstance();
199 
200   struct {
201     int thd           : 8;
202     int cluster       : 8;
203     int conn          : 8;
204     unsigned int node : 8;
205   } id;
206   struct {
207     int initial;
208     int current;
209     int max;
210   } instances;
211   S::Connection *conn;
212   NdbInstance *freelist;
213   Queue<NdbInstance> * sendqueue;
214 };
215 
216 #endif
217