1 /* 2 Copyright (c) 2004, 2019, Oracle and/or its affiliates. All rights reserved. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 #ifndef CONSUMER_HPP 26 #define CONSUMER_HPP 27 28 #include "Restore.hpp" 29 #include "ndb_nodegroup_map.h" 30 #include "restore_tables.h" 31 #include <NdbThread.h> 32 #include <NdbCondition.h> 33 34 class BackupConsumer { 35 public: BackupConsumer()36 BackupConsumer() {} ~BackupConsumer()37 virtual ~BackupConsumer() { } init(Uint32 tableCompabilityMask)38 virtual bool init(Uint32 tableCompabilityMask) { return true;} object(Uint32 tableType,const void *)39 virtual bool object(Uint32 tableType, const void*) { return true;} table(const TableS &)40 virtual bool table(const TableS &){return true;} fk(Uint32 tableType,const void *)41 virtual bool fk(Uint32 tableType, const void*) { return true;} endOfTables()42 virtual bool endOfTables() { return true; } endOfTablesFK()43 virtual bool endOfTablesFK() { return true; } tuple(const TupleS &,Uint32 fragId)44 virtual void tuple(const TupleS &, Uint32 fragId){} tuple_free()45 virtual void tuple_free(){} endOfTuples()46 virtual void endOfTuples(){} logEntry(const LogEntry &)47 virtual void logEntry(const LogEntry &){} endOfLogEntrys()48 virtual void endOfLogEntrys(){} prepare_staging(const TableS &)49 virtual bool prepare_staging(const TableS &){return true;} finalize_staging(const TableS &)50 virtual bool finalize_staging(const TableS &){return true;} finalize_table(const TableS &)51 virtual bool finalize_table(const TableS &){return true;} rebuild_indexes(const TableS &)52 virtual bool rebuild_indexes(const TableS &) { return true;} createSystable(const TableS &)53 virtual bool createSystable(const TableS &){ return true;} update_apply_status(const RestoreMetaData & metaData,bool snapshotstart)54 virtual bool update_apply_status(const RestoreMetaData &metaData, bool snapshotstart) 55 {return true;} report_started(unsigned backup_id,unsigned node_id)56 virtual bool report_started(unsigned backup_id, unsigned node_id) 57 {return true;} report_meta_data(unsigned backup_id,unsigned node_id)58 virtual bool report_meta_data(unsigned backup_id, unsigned node_id) 59 {return true;} report_data(unsigned backup_id,unsigned node_id)60 virtual bool report_data(unsigned backup_id, unsigned node_id) 61 {return true;} report_log(unsigned backup_id,unsigned node_id)62 virtual bool report_log(unsigned backup_id, unsigned node_id) 63 {return true;} report_completed(unsigned backup_id,unsigned node_id)64 virtual bool report_completed(unsigned backup_id, unsigned node_id) 65 {return true;} isMissingTable(const TableS &)66 virtual bool isMissingTable(const TableS &){return false;} 67 NODE_GROUP_MAP *m_nodegroup_map; 68 uint m_nodegroup_map_len; has_temp_error()69 virtual bool has_temp_error() {return false;} table_equal(const TableS &)70 virtual bool table_equal(const TableS &) { return true; } table_compatible_check(TableS &)71 virtual bool table_compatible_check(TableS &) {return true;} check_blobs(TableS &)72 virtual bool check_blobs(TableS &) {return true;} 73 }; 74 75 /* 76 * CyclicBarrier class to sync multiple threads. 77 * To be used where there are N threads which we want to 78 * synchronize periodically at some gating point (the barrier). 79 */ 80 class CyclicBarrier 81 { 82 private: 83 NdbMutex m_mutex; 84 NdbCondition m_condition; 85 86 const Uint32 m_threads; /* Num threads as barrier */ 87 Uint32 m_waiters; /* Num threads waiting */ 88 Uint32 m_round; /* Barrier round */ 89 bool m_cancelled; /* Has barrier been cancelled */ 90 public: 91 /* Create a barrier, waiting for giving number of threads */ CyclicBarrier(const Uint32 threads)92 CyclicBarrier(const Uint32 threads): 93 m_threads(threads), 94 m_waiters(0), 95 m_round(0), 96 m_cancelled(false) 97 { 98 assert(threads > 0); 99 NdbMutex_Init(&m_mutex); 100 NdbCondition_Init(&m_condition); 101 } 102 103 /* Destroy barrier */ ~CyclicBarrier()104 ~CyclicBarrier() 105 { 106 /* Cancel and wait for any waiters to exit */ 107 cancel(); 108 NdbMutex_Deinit(&m_mutex); 109 } 110 111 /** 112 * Wait for all threads to enter barrier 113 * Return true if all arrived 114 * Return false if barrier cancelled 115 */ wait()116 bool wait() 117 { 118 NdbMutex_Lock(&m_mutex); 119 120 if (!m_cancelled) 121 { 122 Uint32 round = m_round; 123 assert(m_waiters < m_threads); 124 m_waiters ++; 125 if (m_waiters == m_threads) 126 { 127 /* Barrier opens and re-cycles */ 128 m_round ++; 129 m_waiters = 0; 130 NdbCondition_Broadcast(&m_condition); 131 } 132 else 133 { 134 /* Not everyone here yet, wait */ 135 while ((round == m_round) && 136 (!m_cancelled)) 137 { 138 NdbCondition_Wait(&m_condition, 139 &m_mutex); 140 } 141 142 if (m_cancelled) 143 { 144 /** 145 * If we were not yet woken 146 * when the barrier was cancelled 147 * then account for #waiters 148 * to allow safe cleanup 149 */ 150 if (round == m_round) 151 { 152 assert(m_waiters > 0); 153 m_waiters --; 154 NdbCondition_Signal(&m_condition); 155 } 156 } 157 } 158 } 159 bool normal_wake = !m_cancelled; 160 NdbMutex_Unlock(&m_mutex); 161 162 return normal_wake; 163 } 164 165 /** 166 * Cancel barrier 167 * Any waiters will be woken with an error 168 * No further use can be made of the barrier. 169 */ cancel()170 void cancel() 171 { 172 NdbMutex_Lock(&m_mutex); 173 { 174 m_cancelled = true; 175 NdbCondition_Broadcast(&m_condition); 176 while (m_waiters > 0) 177 { 178 NdbCondition_Wait(&m_condition, 179 &m_mutex); 180 } 181 } 182 NdbMutex_Unlock(&m_mutex); 183 } 184 }; 185 186 class RestoreThreadData { 187 public: 188 Uint32 m_part_id; 189 int m_result; 190 bool m_restore_meta; 191 NdbThread *m_thread; 192 Vector<BackupConsumer*> m_consumers; RestoreThreadData(Uint32 part_id)193 RestoreThreadData(Uint32 part_id) 194 : m_part_id(part_id), m_result(0), m_restore_meta(false), 195 m_thread(NULL) {} 196 CyclicBarrier *m_barrier; RestoreThreadData(Uint32 partId,CyclicBarrier * barrier)197 RestoreThreadData(Uint32 partId, CyclicBarrier *barrier): m_part_id(partId), 198 m_result(0), m_restore_meta(false), m_thread(NULL), m_barrier(barrier) {} ~RestoreThreadData()199 ~RestoreThreadData() {} 200 }; 201 202 #endif 203