1 /*
2 Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <cstring>
26 #include <new>
27
28 #include "ndb_global.h"
29 #include "ndb_types.h"
30 #include "NdbApiSignal.hpp"
31 #include "AssembleFragments.hpp"
32
AssembleBatchedFragments()33 AssembleBatchedFragments::AssembleBatchedFragments()
34 : m_sender_ref(0),
35 m_section_memory(nullptr)
36 {}
37
~AssembleBatchedFragments()38 AssembleBatchedFragments::~AssembleBatchedFragments()
39 {
40 delete[] m_section_memory;
41 }
42
43 bool
setup(Uint32 size)44 AssembleBatchedFragments::setup(Uint32 size)
45 {
46 require(m_section_memory == nullptr);
47 m_section_memory = new (std::nothrow) Uint32[size];
48 m_size = size;
49 return m_section_memory != nullptr;
50 }
51
52 /** extract fills in the assembled signal and its sections into signal and
53 * ptr and returns the number of sections.
54 */
55 Uint32
extract(NdbApiSignal * signal,LinearSectionPtr ptr[3]) const56 AssembleBatchedFragments::extract(NdbApiSignal* signal,
57 LinearSectionPtr ptr[3]) const
58 {
59 NdbApiSignal sig{m_sigheader};
60 sig.setDataPtr(sig.getDataPtrSend());
61 std::memcpy(sig.getDataPtrSend(), m_theData, sig.theLength * 4);
62
63 *signal = sig;
64 signal->m_noOfSections = m_section_count;
65
66 Uint32* p = m_section_memory;
67 Uint32 sec_idx;
68 Uint32 sec_cnt = 0;
69 for (sec_idx = 0; sec_idx < 3; sec_idx++)
70 {
71 ptr[sec_idx].p = p + m_section_offset[sec_idx];
72 ptr[sec_idx].sz = m_section_size[sec_idx];
73 if (ptr[sec_idx].sz > 0)
74 {
75 sec_cnt = sec_idx + 1;
76 }
77 }
78 signal->m_noOfSections = sec_cnt;
79 return signal->m_noOfSections;
80 }
81
82 void
cleanup()83 AssembleBatchedFragments::cleanup()
84 {
85 require(m_section_memory != nullptr);
86 delete[] m_section_memory;
87 m_section_memory = nullptr;
88 m_size = 0;
89 m_sender_ref = 0;
90 }
91
92 void
extract_signal_only(NdbApiSignal * signal)93 AssembleBatchedFragments::extract_signal_only(NdbApiSignal* signal)
94 {
95 require(m_section_memory == nullptr);
96
97 NdbApiSignal sig{m_sigheader};
98 sig.setDataPtr(sig.getDataPtrSend());
99 std::memcpy(sig.getDataPtrSend(), m_theData, sig.theLength*4);
100
101 *signal = sig;
102 signal->m_noOfSections = m_section_count;
103 }
104
105 AssembleBatchedFragments::Result
do_assemble(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])106 AssembleBatchedFragments::do_assemble(const NdbApiSignal* signal,
107 const LinearSectionPtr ptr[3])
108 {
109 if (signal->isFirstFragment())
110 {
111 m_sigheader = *signal; // slice intended
112 std::memcpy(m_theData, signal->getDataPtr(), signal->theLength * 4);
113 m_sigheader.theLength = signal->theLength - signal->m_noOfSections - 1;
114 m_sigheader.m_noOfSections = 0;
115 m_sender_ref = signal->theSendersBlockRef;
116 m_fragment_id = signal->getFragmentId();
117 m_offset = 0;
118 m_section_count = 0;
119 for (int i = 0; i < 3; i++)
120 {
121 m_section_offset[i] = 0;
122 m_section_size[i] = 0;
123 }
124 }
125
126 const Uint32 sec_cnt = signal->m_noOfSections;
127 for (Uint32 sec_idx = 0; sec_idx < sec_cnt; sec_idx++)
128 {
129 const Uint32 sec_num = signal->getFragmentSectionNumber(sec_idx);
130
131 require(sec_num < 3);
132 if (m_size - m_offset < ptr[sec_idx].sz)
133 {
134 // Drop collected section data
135 cleanup();
136 return Result::ERR_DATA_DROPPED; // No space left
137 }
138 if (m_section_size[sec_num] == 0)
139 {
140 require(m_section_offset[sec_num] == 0);
141 m_section_offset[sec_num] = m_offset;
142 }
143 std::memcpy(m_section_memory + m_offset, ptr[sec_idx].p, ptr[sec_idx].sz * 4);
144 m_offset += ptr[sec_idx].sz;
145 m_section_size[sec_num] += ptr[sec_idx].sz;
146 }
147 if (!signal->isLastFragment())
148 {
149 return Result::NEED_MORE;
150 }
151 if (m_offset != m_size)
152 {
153 // Drop collected section data
154 cleanup();
155 return Result::ERR_MESSAGE_INCOMPLETE;
156 }
157 return Result::MESSAGE_COMPLETE;
158 }
159