1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 // $Id: tdriver-dec.cpp 9210 2013-01-21 14:10:42Z rdempsey $
19 #include <iostream>
20 #include <cassert>
21 #include <stdexcept>
22 using namespace std;
23 
24 #include <boost/thread.hpp>
25 using namespace boost;
26 
27 #include "primitivemsg.h"
28 
29 #include "bytestream.h"
30 using namespace messageqcpp;
31 
32 #include "distributedenginecomm.h"
33 using namespace joblist;
34 
35 class TestDistributedEngineComm
36 {
37 public:
TestDistributedEngineComm(DistributedEngineComm * dec)38     TestDistributedEngineComm(DistributedEngineComm* dec) : fDec(dec) { }
addDataToOutput(const ByteStream & bs)39     void addDataToOutput(const ByteStream& bs)
40     {
41         fDec->addDataToOutput(bs);
42     }
43 
44 private:
45     DistributedEngineComm* fDec;
46 };
47 
48 namespace
49 {
50 
buildBs(Int16 sessionId,Int16 stepId)51 const ByteStream buildBs(Int16 sessionId, Int16 stepId)
52 {
53     uint32_t len = sizeof(ISMPacketHeader) + 2 * sizeof(PrimitiveHeader);
54     ByteStream::byte* bpr = new ByteStream::byte[len];
55 
56     ISMPacketHeader* hdr = reinterpret_cast<ISMPacketHeader*>(bpr);
57     PrimitiveHeader* p = reinterpret_cast<PrimitiveHeader*>(hdr + 1);
58 
59     p->SessionID = sessionId;
60     p->StepID = stepId;
61 
62     ByteStream bs(bpr, len);
63     delete [] bpr;
64     return bs;
65 }
66 
readBs(const ByteStream & bs,Int16 & sessionId,Int16 & stepId)67 void readBs(const ByteStream& bs, Int16& sessionId, Int16& stepId)
68 {
69     const ISMPacketHeader* hdr = reinterpret_cast<const ISMPacketHeader*>(bs.buf());
70     const PrimitiveHeader* p = reinterpret_cast<const PrimitiveHeader*>(hdr + 1);
71     sessionId = p->SessionID;
72     stepId = p->StepID;
73     return;
74 }
75 
76 class ThdFun1
77 {
78 public:
ThdFun1(DistributedEngineComm * dec,int sessionId,int stepId)79     ThdFun1(DistributedEngineComm* dec, int sessionId, int stepId) :
80         fDec(dec), fSessionId(sessionId), fStepId(stepId) { }
operator ()()81     void operator()()
82     {
83         ByteStream bs = fDec->read(fSessionId, fStepId);
84         idbassert(bs.length() == 0);
85         return;
86     }
87 private:
88     DistributedEngineComm* fDec;
89     int fSessionId;
90     int fStepId;
91 };
92 
93 }
94 
main(int argc,char ** argv)95 int main(int argc, char** argv)
96 {
97     int leakCheck = 0;
98 
99     if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0) leakCheck = 1;
100 
101     DistributedEngineComm* dec;
102 
103     dec = DistributedEngineComm::instance("./config-dec.xml");
104 
105     dec->addSession(12345);
106     dec->addStep(12345, 0);
107     dec->addStep(12345, 1);
108     dec->addStep(12345, 3);
109     dec->addStep(12345, 10);
110 
111     TestDistributedEngineComm tdec(dec);
112     ByteStream bs;
113 
114     tdec.addDataToOutput(buildBs(12345, 0));
115     tdec.addDataToOutput(buildBs(12345, 1));
116     tdec.addDataToOutput(buildBs(12345, 3));
117     tdec.addDataToOutput(buildBs(12345, 10));
118 
119     Int16 sessionId, stepId;
120     bs = dec->read(12345, 10);
121     readBs(bs, sessionId, stepId);
122     idbassert(sessionId == 12345);
123     idbassert(stepId == 10);
124 
125     bs = dec->read(12345, 1);
126     readBs(bs, sessionId, stepId);
127     idbassert(sessionId == 12345);
128     idbassert(stepId == 1);
129 
130     bs = dec->read(12345, 0);
131     readBs(bs, sessionId, stepId);
132     idbassert(sessionId == 12345);
133     idbassert(stepId == 0);
134 
135     bs = dec->read(12345, 3);
136     readBs(bs, sessionId, stepId);
137     idbassert(sessionId == 12345);
138     idbassert(stepId == 3);
139 
140     unsigned i;
141     bs = buildBs(12345, 1);
142     // 1M seems a bit too much for a dev box
143     // 500K is about the max
144     const unsigned loopMax = 200000 / (leakCheck * 99 + 1);
145 
146     for (i = 0; i < loopMax; i++)
147     {
148         tdec.addDataToOutput(bs);
149     }
150 
151     for (i = 0; i < loopMax; i++)
152     {
153         bs = dec->read(12345, 1);
154         readBs(bs, sessionId, stepId);
155         idbassert(sessionId == 12345);
156         idbassert(stepId == 1);
157     }
158 
159     unsigned throws;
160     throws = 0;
161 
162     for (i = 0; i < loopMax; i++)
163     {
164         bs = buildBs(12345, (i % 10));
165 
166         //some of these shoud throw since there's only a few steps added
167         try
168         {
169             tdec.addDataToOutput(bs);
170         }
171         catch (runtime_error& re)
172         {
173             throws++;
174             continue;
175         }
176     }
177 
178     idbassert(throws > 0);
179 
180     throws = 0;
181 
182     for (i = 0; i < loopMax; i++)
183     {
184         //some of these shoud throw since there's only a few steps added
185         try
186         {
187             bs = dec->read(12345, (i % 10));
188         }
189         catch (runtime_error& re)
190         {
191             throws++;
192             continue;
193         }
194 
195         readBs(bs, sessionId, stepId);
196         idbassert(sessionId == 12345);
197         idbassert(stepId == (i % 10));
198     }
199 
200     idbassert(throws > 0);
201 
202     ThdFun1 fun1(dec, 12345, 1);
203     thread thd1(fun1);
204 
205     dec->removeSession(12345);
206 
207     thd1.join();
208 
209     //delete dec;
210 
211     return 0;
212 }
213 
214