1 /* Copyright (c) 2009, 2021, Oracle and/or its affiliates. 2 All rights reserved. Use is subject to license terms. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ 23 24 #ifndef Defragger_H 25 #define Defragger_H 26 27 28 /* 29 reception of fragmented signals 30 - defragments signal based on nodeid and fragmentid 31 */ 32 33 class Defragger { 34 struct DefragBuffer { 35 // Key 36 Uint32 m_fragment_id; 37 NodeId m_node_id; 38 // Data 39 UtilBuffer m_buffer; DefragBufferDefragger::DefragBuffer40 DefragBuffer(NodeId nodeId, Uint32 fragId) : 41 m_fragment_id(fragId), m_node_id(nodeId) {} 42 }; 43 Vector<DefragBuffer*> m_buffers; 44 find_buffer(NodeId nodeId,Uint32 fragId)45 DefragBuffer* find_buffer(NodeId nodeId, Uint32 fragId){ 46 for (unsigned i = 0; i < m_buffers.size(); i++) 47 { 48 DefragBuffer* dbuf = m_buffers[i]; 49 if (dbuf->m_node_id == nodeId && 50 dbuf->m_fragment_id == fragId) 51 return dbuf; 52 } 53 return NULL; 54 } 55 erase_buffer(const DefragBuffer * dbuf)56 void erase_buffer(const DefragBuffer* dbuf){ 57 for (unsigned i = 0; i < m_buffers.size(); i++) 58 { 59 if (m_buffers[i] == dbuf) 60 { 61 delete dbuf; 62 m_buffers.erase(i); 63 return; 64 } 65 } 66 assert(false); // Should never be reached 67 } 68 69 public: Defragger()70 Defragger() {}; ~Defragger()71 ~Defragger() 72 { 73 for (unsigned i = m_buffers.size(); i > 0; --i) 74 { 75 delete m_buffers[i-1]; // free the memory of the fragment 76 } 77 // m_buffers will be freed by ~Vector 78 }; 79 80 /* 81 return true when complete signal received 82 */ 83 defragment(SimpleSignal * sig)84 bool defragment(SimpleSignal* sig) { 85 86 if (!sig->isFragmented()) 87 return true; 88 89 Uint32 fragId = sig->getFragmentId(); 90 NodeId nodeId = refToNode(sig->header.theSendersBlockRef); 91 92 DefragBuffer* dbuf; 93 if(sig->isFirstFragment()){ 94 95 // Make sure buffer does not exist 96 if (find_buffer(nodeId, fragId)) 97 abort(); 98 99 dbuf = new DefragBuffer(nodeId, fragId); 100 m_buffers.push_back(dbuf); 101 102 } else { 103 dbuf = find_buffer(nodeId, fragId); 104 if (dbuf == NULL) 105 abort(); 106 } 107 if (dbuf->m_buffer.append(sig->ptr[0].p, sig->ptr[0].sz * sizeof(Uint32))) 108 abort(); // OOM 109 110 if (!sig->isLastFragment()) 111 return false; 112 113 // Copy defragmented data into signal... 114 int length = dbuf->m_buffer.length(); 115 delete[] sig->ptr[0].p; 116 sig->ptr[0].sz = (length+3)/4; 117 sig->ptr[0].p = new Uint32[sig->ptr[0].sz]; 118 memcpy(sig->ptr[0].p, dbuf->m_buffer.get_data(), length); 119 120 // erase the buffer data 121 erase_buffer(dbuf); 122 return true; 123 } 124 125 126 /* 127 clear any unassembled signal buffers from node 128 */ node_failed(NodeId nodeId)129 void node_failed(NodeId nodeId) { 130 for (unsigned i = m_buffers.size(); i > 0; --i) 131 { 132 if (m_buffers[i-1]->m_node_id == nodeId) 133 { 134 delete m_buffers[i]; // free the memory of the signal fragment 135 m_buffers.erase(i); // remove the reference from the vector. 136 } 137 } 138 } 139 140 }; 141 142 #endif 143