1 /*
2    Copyright (c) 2004, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef CONSUMER_HPP
26 #define CONSUMER_HPP
27 
28 #include "Restore.hpp"
29 #include "ndb_nodegroup_map.h"
30 #include "restore_tables.h"
31 #include <NdbThread.h>
32 #include <NdbCondition.h>
33 
34 class BackupConsumer {
35 public:
BackupConsumer()36   BackupConsumer() {}
~BackupConsumer()37   virtual ~BackupConsumer() { }
init(Uint32 tableCompabilityMask)38   virtual bool init(Uint32 tableCompabilityMask) { return true;}
object(Uint32 tableType,const void *)39   virtual bool object(Uint32 tableType, const void*) { return true;}
table(const TableS &)40   virtual bool table(const TableS &){return true;}
fk(Uint32 tableType,const void *)41   virtual bool fk(Uint32 tableType, const void*) { return true;}
endOfTables()42   virtual bool endOfTables() { return true; }
endOfTablesFK()43   virtual bool endOfTablesFK() { return true; }
tuple(const TupleS &,Uint32 fragId)44   virtual void tuple(const TupleS &, Uint32 fragId){}
tuple_free()45   virtual void tuple_free(){}
endOfTuples()46   virtual void endOfTuples(){}
logEntry(const LogEntry &)47   virtual void logEntry(const LogEntry &){}
endOfLogEntrys()48   virtual void endOfLogEntrys(){}
prepare_staging(const TableS &)49   virtual bool prepare_staging(const TableS &){return true;}
finalize_staging(const TableS &)50   virtual bool finalize_staging(const TableS &){return true;}
finalize_table(const TableS &)51   virtual bool finalize_table(const TableS &){return true;}
rebuild_indexes(const TableS &)52   virtual bool rebuild_indexes(const TableS &) { return true;}
createSystable(const TableS &)53   virtual bool createSystable(const TableS &){ return true;}
update_apply_status(const RestoreMetaData & metaData,bool snapshotstart)54   virtual bool update_apply_status(const RestoreMetaData &metaData, bool snapshotstart)
55     {return true;}
report_started(unsigned backup_id,unsigned node_id)56   virtual bool report_started(unsigned backup_id, unsigned node_id)
57     {return true;}
report_meta_data(unsigned backup_id,unsigned node_id)58   virtual bool report_meta_data(unsigned backup_id, unsigned node_id)
59     {return true;}
report_data(unsigned backup_id,unsigned node_id)60   virtual bool report_data(unsigned backup_id, unsigned node_id)
61     {return true;}
report_log(unsigned backup_id,unsigned node_id)62   virtual bool report_log(unsigned backup_id, unsigned node_id)
63     {return true;}
report_completed(unsigned backup_id,unsigned node_id)64   virtual bool report_completed(unsigned backup_id, unsigned node_id)
65     {return true;}
isMissingTable(const TableS &)66   virtual bool isMissingTable(const TableS &){return false;}
67   NODE_GROUP_MAP *m_nodegroup_map;
68   uint            m_nodegroup_map_len;
has_temp_error()69   virtual bool has_temp_error() {return false;}
table_equal(const TableS &)70   virtual bool table_equal(const TableS &) { return true; }
table_compatible_check(TableS &)71   virtual bool table_compatible_check(TableS &) {return true;}
check_blobs(TableS &)72   virtual bool check_blobs(TableS &) {return true;}
73 };
74 
75 /*
76  * CyclicBarrier class to sync multiple threads.
77  * To be used where there are N threads which we want to
78  * synchronize periodically at some gating point (the barrier).
79  */
80 class CyclicBarrier
81 {
82 private:
83   NdbMutex m_mutex;
84   NdbCondition m_condition;
85 
86   const Uint32 m_threads;   /* Num threads as barrier */
87   Uint32 m_waiters;         /* Num threads waiting */
88   Uint32 m_round;           /* Barrier round */
89   bool m_cancelled;         /* Has barrier been cancelled */
90 public:
91   /* Create a barrier, waiting for giving number of threads */
CyclicBarrier(const Uint32 threads)92   CyclicBarrier(const Uint32 threads):
93     m_threads(threads),
94     m_waiters(0),
95     m_round(0),
96     m_cancelled(false)
97   {
98     assert(threads > 0);
99     NdbMutex_Init(&m_mutex);
100     NdbCondition_Init(&m_condition);
101   }
102 
103   /* Destroy barrier */
~CyclicBarrier()104   ~CyclicBarrier()
105   {
106     /* Cancel and wait for any waiters to exit */
107     cancel();
108     NdbMutex_Deinit(&m_mutex);
109   }
110 
111   /**
112    * Wait for all threads to enter barrier
113    * Return true if all arrived
114    * Return false if barrier cancelled
115    */
wait()116   bool wait()
117   {
118     NdbMutex_Lock(&m_mutex);
119 
120     if (!m_cancelled)
121     {
122       Uint32 round = m_round;
123       assert(m_waiters < m_threads);
124       m_waiters ++;
125       if (m_waiters == m_threads)
126       {
127         /* Barrier opens and re-cycles */
128         m_round ++;
129         m_waiters = 0;
130         NdbCondition_Broadcast(&m_condition);
131       }
132       else
133       {
134         /* Not everyone here yet, wait */
135         while ((round == m_round) &&
136                (!m_cancelled))
137         {
138           NdbCondition_Wait(&m_condition,
139                             &m_mutex);
140         }
141 
142         if (m_cancelled)
143         {
144           /**
145            * If we were not yet woken
146            * when the barrier was cancelled
147            * then account for #waiters
148            * to allow safe cleanup
149            */
150           if (round == m_round)
151           {
152             assert(m_waiters > 0);
153             m_waiters --;
154             NdbCondition_Signal(&m_condition);
155           }
156         }
157       }
158     }
159     bool normal_wake = !m_cancelled;
160     NdbMutex_Unlock(&m_mutex);
161 
162     return normal_wake;
163   }
164 
165   /**
166    * Cancel barrier
167    * Any waiters will be woken with an error
168    * No further use can be made of the barrier.
169    */
cancel()170   void cancel()
171   {
172     NdbMutex_Lock(&m_mutex);
173     {
174       m_cancelled = true;
175       NdbCondition_Broadcast(&m_condition);
176       while (m_waiters > 0)
177       {
178         NdbCondition_Wait(&m_condition,
179                           &m_mutex);
180       }
181     }
182     NdbMutex_Unlock(&m_mutex);
183   }
184 };
185 
186 class RestoreThreadData {
187 public:
188   Uint32 m_part_id;
189   int m_result;
190   bool m_restore_meta;
191   NdbThread *m_thread;
192   Vector<BackupConsumer*> m_consumers;
RestoreThreadData(Uint32 part_id)193   RestoreThreadData(Uint32 part_id)
194                     : m_part_id(part_id), m_result(0), m_restore_meta(false),
195                       m_thread(NULL) {}
196   CyclicBarrier *m_barrier;
RestoreThreadData(Uint32 partId,CyclicBarrier * barrier)197   RestoreThreadData(Uint32 partId, CyclicBarrier *barrier): m_part_id(partId),
198      m_result(0), m_restore_meta(false), m_thread(NULL), m_barrier(barrier) {}
~RestoreThreadData()199   ~RestoreThreadData() {}
200 };
201 
202 #endif
203