1 /*
2    Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <cstring>
26 #include <new>
27 
28 #include "ndb_global.h"
29 #include "ndb_types.h"
30 #include "NdbApiSignal.hpp"
31 #include "AssembleFragments.hpp"
32 
AssembleBatchedFragments()33 AssembleBatchedFragments::AssembleBatchedFragments()
34 :  m_sender_ref(0),
35    m_section_memory(nullptr)
36 {}
37 
~AssembleBatchedFragments()38 AssembleBatchedFragments::~AssembleBatchedFragments()
39 {
40   delete[] m_section_memory;
41 }
42 
43 bool
setup(Uint32 size)44 AssembleBatchedFragments::setup(Uint32 size)
45 {
46   require(m_section_memory == nullptr);
47   m_section_memory = new (std::nothrow) Uint32[size];
48   m_size = size;
49   return m_section_memory != nullptr;
50 }
51 
52 /** extract fills in the assembled signal and its sections into signal and
53  * ptr and returns the number of sections.
54  */
55 Uint32
extract(NdbApiSignal * signal,LinearSectionPtr ptr[3]) const56 AssembleBatchedFragments::extract(NdbApiSignal* signal,
57                                   LinearSectionPtr ptr[3]) const
58 {
59   NdbApiSignal sig{m_sigheader};
60   sig.setDataPtr(sig.getDataPtrSend());
61   std::memcpy(sig.getDataPtrSend(), m_theData, sig.theLength * 4);
62 
63   *signal = sig;
64   signal->m_noOfSections = m_section_count;
65 
66   Uint32* p = m_section_memory;
67   Uint32 sec_idx;
68   Uint32 sec_cnt = 0;
69   for (sec_idx = 0; sec_idx < 3; sec_idx++)
70   {
71     ptr[sec_idx].p = p + m_section_offset[sec_idx];
72     ptr[sec_idx].sz = m_section_size[sec_idx];
73     if (ptr[sec_idx].sz > 0)
74     {
75       sec_cnt = sec_idx + 1;
76     }
77   }
78   signal->m_noOfSections = sec_cnt;
79   return signal->m_noOfSections;
80 }
81 
82 void
cleanup()83 AssembleBatchedFragments::cleanup()
84 {
85   require(m_section_memory != nullptr);
86   delete[] m_section_memory;
87   m_section_memory = nullptr;
88   m_size = 0;
89   m_sender_ref = 0;
90 }
91 
92 void
extract_signal_only(NdbApiSignal * signal)93 AssembleBatchedFragments::extract_signal_only(NdbApiSignal* signal)
94 {
95   require(m_section_memory == nullptr);
96 
97   NdbApiSignal sig{m_sigheader};
98   sig.setDataPtr(sig.getDataPtrSend());
99   std::memcpy(sig.getDataPtrSend(), m_theData, sig.theLength*4);
100 
101   *signal = sig;
102   signal->m_noOfSections = m_section_count;
103 }
104 
105 AssembleBatchedFragments::Result
do_assemble(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])106 AssembleBatchedFragments::do_assemble(const NdbApiSignal* signal,
107                                       const LinearSectionPtr ptr[3])
108 {
109   if (signal->isFirstFragment())
110   {
111     m_sigheader = *signal; // slice intended
112     std::memcpy(m_theData, signal->getDataPtr(), signal->theLength * 4);
113     m_sigheader.theLength = signal->theLength - signal->m_noOfSections - 1;
114     m_sigheader.m_noOfSections = 0;
115     m_sender_ref = signal->theSendersBlockRef;
116     m_fragment_id = signal->getFragmentId();
117     m_offset = 0;
118     m_section_count = 0;
119     for (int i = 0; i < 3; i++)
120     {
121       m_section_offset[i] = 0;
122       m_section_size[i] = 0;
123     }
124   }
125 
126   const Uint32 sec_cnt = signal->m_noOfSections;
127   for (Uint32 sec_idx = 0; sec_idx < sec_cnt; sec_idx++)
128   {
129     const Uint32 sec_num = signal->getFragmentSectionNumber(sec_idx);
130 
131     require(sec_num < 3);
132     if (m_size - m_offset < ptr[sec_idx].sz)
133     {
134       // Drop collected section data
135       cleanup();
136       return Result::ERR_DATA_DROPPED; // No space left
137     }
138     if (m_section_size[sec_num] == 0)
139     {
140       require(m_section_offset[sec_num] == 0);
141       m_section_offset[sec_num] = m_offset;
142     }
143     std::memcpy(m_section_memory + m_offset, ptr[sec_idx].p, ptr[sec_idx].sz * 4);
144     m_offset += ptr[sec_idx].sz;
145     m_section_size[sec_num] += ptr[sec_idx].sz;
146   }
147   if (!signal->isLastFragment())
148   {
149     return Result::NEED_MORE;
150   }
151   if (m_offset != m_size)
152   {
153     // Drop collected section data
154     cleanup();
155     return Result::ERR_MESSAGE_INCOMPLETE;
156   }
157   return Result::MESSAGE_COMPLETE;
158 }
159