1 /* Copyright (C) 2009 Sun Microsystems, Inc.
2      All rights reserved. Use is subject to license terms.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
23 
24 #ifndef Defragger_H
25 #define Defragger_H
26 
27 
28 /*
29   reception of fragmented signals
30   - defragments signal based on nodeid and fragmentid
31 */
32 
33 class Defragger {
34   struct DefragBuffer {
35     // Key
36     Uint32 m_fragment_id;
37     NodeId m_node_id;
38     // Data
39     UtilBuffer m_buffer;
DefragBufferDefragger::DefragBuffer40     DefragBuffer(NodeId nodeId, Uint32 fragId) :
41       m_fragment_id(fragId), m_node_id(nodeId) {}
42   };
43   Vector<DefragBuffer*> m_buffers;
44 
find_buffer(NodeId nodeId,Uint32 fragId)45   DefragBuffer* find_buffer(NodeId nodeId, Uint32 fragId){
46     for (unsigned i = 0; i < m_buffers.size(); i++)
47     {
48       DefragBuffer* dbuf = m_buffers[i];
49       if (dbuf->m_node_id == nodeId &&
50           dbuf->m_fragment_id == fragId)
51         return dbuf;
52     }
53     return NULL;
54   }
55 
erase_buffer(const DefragBuffer * dbuf)56   void erase_buffer(const DefragBuffer* dbuf){
57     for (unsigned i = 0; i < m_buffers.size(); i++)
58     {
59       if (m_buffers[i] == dbuf)
60       {
61         delete dbuf;
62         m_buffers.erase(i);
63         return;
64       }
65     }
66     assert(false); // Should never be reached
67   }
68 
69 public:
Defragger()70   Defragger() {};
~Defragger()71   ~Defragger()
72   {
73     for (unsigned i = m_buffers.size(); i > 0; --i)
74     {
75       delete m_buffers[i-1]; // free the memory of the fragment
76     }
77     // m_buffers will be freed by ~Vector
78   };
79 
80   /*
81     return true when complete signal received
82   */
83 
defragment(SimpleSignal * sig)84   bool defragment(SimpleSignal* sig) {
85 
86     if (!sig->isFragmented())
87       return true;
88 
89     Uint32 fragId = sig->getFragmentId();
90     NodeId nodeId = refToNode(sig->header.theSendersBlockRef);
91 
92     DefragBuffer* dbuf;
93     if(sig->isFirstFragment()){
94 
95       // Make sure buffer does not exist
96       if (find_buffer(nodeId, fragId))
97         abort();
98 
99       dbuf = new DefragBuffer(nodeId, fragId);
100       m_buffers.push_back(dbuf);
101 
102     } else {
103       dbuf = find_buffer(nodeId, fragId);
104       if (dbuf == NULL)
105         abort();
106     }
107     if (dbuf->m_buffer.append(sig->ptr[0].p, sig->ptr[0].sz * sizeof(Uint32)))
108       abort(); // OOM
109 
110     if (!sig->isLastFragment())
111       return false;
112 
113     // Copy defragmented data into signal...
114     int length = dbuf->m_buffer.length();
115     delete[] sig->ptr[0].p;
116     sig->ptr[0].sz = (length+3)/4;
117     sig->ptr[0].p = new Uint32[sig->ptr[0].sz];
118     memcpy(sig->ptr[0].p, dbuf->m_buffer.get_data(), length);
119 
120     // erase the buffer data
121     erase_buffer(dbuf);
122     return true;
123   }
124 
125 
126   /*
127     clear any unassembled signal buffers from node
128   */
node_failed(NodeId nodeId)129   void node_failed(NodeId nodeId) {
130     for (unsigned i = m_buffers.size(); i > 0; --i)
131     {
132       if (m_buffers[i-1]->m_node_id == nodeId)
133       {
134         delete m_buffers[i]; // free the memory of the signal fragment
135 	m_buffers.erase(i); // remove the reference from the vector.
136       }
137     }
138   }
139 
140 };
141 
142 #endif
143