1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 // $Id: tdriver-dec.cpp 9210 2013-01-21 14:10:42Z rdempsey $
19 #include <iostream>
20 #include <cassert>
21 #include <stdexcept>
22 using namespace std;
23
24 #include <boost/thread.hpp>
25 using namespace boost;
26
27 #include "primitivemsg.h"
28
29 #include "bytestream.h"
30 using namespace messageqcpp;
31
32 #include "distributedenginecomm.h"
33 using namespace joblist;
34
35 class TestDistributedEngineComm
36 {
37 public:
TestDistributedEngineComm(DistributedEngineComm * dec)38 TestDistributedEngineComm(DistributedEngineComm* dec) : fDec(dec) { }
addDataToOutput(const ByteStream & bs)39 void addDataToOutput(const ByteStream& bs)
40 {
41 fDec->addDataToOutput(bs);
42 }
43
44 private:
45 DistributedEngineComm* fDec;
46 };
47
48 namespace
49 {
50
buildBs(Int16 sessionId,Int16 stepId)51 const ByteStream buildBs(Int16 sessionId, Int16 stepId)
52 {
53 uint32_t len = sizeof(ISMPacketHeader) + 2 * sizeof(PrimitiveHeader);
54 ByteStream::byte* bpr = new ByteStream::byte[len];
55
56 ISMPacketHeader* hdr = reinterpret_cast<ISMPacketHeader*>(bpr);
57 PrimitiveHeader* p = reinterpret_cast<PrimitiveHeader*>(hdr + 1);
58
59 p->SessionID = sessionId;
60 p->StepID = stepId;
61
62 ByteStream bs(bpr, len);
63 delete [] bpr;
64 return bs;
65 }
66
readBs(const ByteStream & bs,Int16 & sessionId,Int16 & stepId)67 void readBs(const ByteStream& bs, Int16& sessionId, Int16& stepId)
68 {
69 const ISMPacketHeader* hdr = reinterpret_cast<const ISMPacketHeader*>(bs.buf());
70 const PrimitiveHeader* p = reinterpret_cast<const PrimitiveHeader*>(hdr + 1);
71 sessionId = p->SessionID;
72 stepId = p->StepID;
73 return;
74 }
75
76 class ThdFun1
77 {
78 public:
ThdFun1(DistributedEngineComm * dec,int sessionId,int stepId)79 ThdFun1(DistributedEngineComm* dec, int sessionId, int stepId) :
80 fDec(dec), fSessionId(sessionId), fStepId(stepId) { }
operator ()()81 void operator()()
82 {
83 ByteStream bs = fDec->read(fSessionId, fStepId);
84 idbassert(bs.length() == 0);
85 return;
86 }
87 private:
88 DistributedEngineComm* fDec;
89 int fSessionId;
90 int fStepId;
91 };
92
93 }
94
main(int argc,char ** argv)95 int main(int argc, char** argv)
96 {
97 int leakCheck = 0;
98
99 if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0) leakCheck = 1;
100
101 DistributedEngineComm* dec;
102
103 dec = DistributedEngineComm::instance("./config-dec.xml");
104
105 dec->addSession(12345);
106 dec->addStep(12345, 0);
107 dec->addStep(12345, 1);
108 dec->addStep(12345, 3);
109 dec->addStep(12345, 10);
110
111 TestDistributedEngineComm tdec(dec);
112 ByteStream bs;
113
114 tdec.addDataToOutput(buildBs(12345, 0));
115 tdec.addDataToOutput(buildBs(12345, 1));
116 tdec.addDataToOutput(buildBs(12345, 3));
117 tdec.addDataToOutput(buildBs(12345, 10));
118
119 Int16 sessionId, stepId;
120 bs = dec->read(12345, 10);
121 readBs(bs, sessionId, stepId);
122 idbassert(sessionId == 12345);
123 idbassert(stepId == 10);
124
125 bs = dec->read(12345, 1);
126 readBs(bs, sessionId, stepId);
127 idbassert(sessionId == 12345);
128 idbassert(stepId == 1);
129
130 bs = dec->read(12345, 0);
131 readBs(bs, sessionId, stepId);
132 idbassert(sessionId == 12345);
133 idbassert(stepId == 0);
134
135 bs = dec->read(12345, 3);
136 readBs(bs, sessionId, stepId);
137 idbassert(sessionId == 12345);
138 idbassert(stepId == 3);
139
140 unsigned i;
141 bs = buildBs(12345, 1);
142 // 1M seems a bit too much for a dev box
143 // 500K is about the max
144 const unsigned loopMax = 200000 / (leakCheck * 99 + 1);
145
146 for (i = 0; i < loopMax; i++)
147 {
148 tdec.addDataToOutput(bs);
149 }
150
151 for (i = 0; i < loopMax; i++)
152 {
153 bs = dec->read(12345, 1);
154 readBs(bs, sessionId, stepId);
155 idbassert(sessionId == 12345);
156 idbassert(stepId == 1);
157 }
158
159 unsigned throws;
160 throws = 0;
161
162 for (i = 0; i < loopMax; i++)
163 {
164 bs = buildBs(12345, (i % 10));
165
166 //some of these shoud throw since there's only a few steps added
167 try
168 {
169 tdec.addDataToOutput(bs);
170 }
171 catch (runtime_error& re)
172 {
173 throws++;
174 continue;
175 }
176 }
177
178 idbassert(throws > 0);
179
180 throws = 0;
181
182 for (i = 0; i < loopMax; i++)
183 {
184 //some of these shoud throw since there's only a few steps added
185 try
186 {
187 bs = dec->read(12345, (i % 10));
188 }
189 catch (runtime_error& re)
190 {
191 throws++;
192 continue;
193 }
194
195 readBs(bs, sessionId, stepId);
196 idbassert(sessionId == 12345);
197 idbassert(stepId == (i % 10));
198 }
199
200 idbassert(throws > 0);
201
202 ThdFun1 fun1(dec, 12345, 1);
203 thread thd1(fun1);
204
205 dec->removeSession(12345);
206
207 thd1.join();
208
209 //delete dec;
210
211 return 0;
212 }
213
214