1 /*
2 * Copyright (C) 2009-2014 Codership Oy <info@codership.com>
3 *
4 * $Id$
5 */
6
7 /*!
8 * @brief Check trace implementation
9 */
10
11 #include "check_trace.hpp"
12 #include "gcomm/conf.hpp"
13 #include "gu_asio.hpp" // gu::ssl_register_params()
14
15 using namespace std;
16 using namespace gu;
17 using namespace gcomm;
18
19 struct CheckTraceConfInit
20 {
CheckTraceConfInitCheckTraceConfInit21 explicit CheckTraceConfInit(gu::Config& conf)
22 {
23 gu::ssl_register_params(conf);
24 gcomm::Conf::register_params(conf);
25 }
26 };
27
check_trace_log_cb(int severity,const char * msg)28 extern "C" void check_trace_log_cb(int severity, const char* msg)
29 {
30 std::cerr << gu::datetime::Date::monotonic() << ": " << msg << "\n";
31 }
32
33
34 // This is to avoid static initialization fiasco with gcomm::Conf static members
35 // Ideally it is the latter which should be wrapped in a function, but, unless
36 // this is used to initialize another static object, it should be fine.
check_trace_conf()37 gu::Config& check_trace_conf()
38 {
39 static gu::Config conf;
40 static CheckTraceConfInit const check_trace_conf_init(conf);
41
42 return conf;
43 }
44
45 std::unique_ptr<Protonet> DummyTransport::net_;
get_net()46 Protonet& DummyTransport::get_net()
47 {
48 // Unit tests are single threaded, no need to worry about thread
49 // synchronization here.
50 if (not net_)
51 net_ = std::unique_ptr<Protonet>(Protonet::create(check_trace_conf()));;
52 return *net_;
53 }
54
55
operator <<(ostream & os,const TraceMsg & msg)56 ostream& gcomm::operator<<(ostream& os, const TraceMsg& msg)
57 {
58 return (os << "(" << msg.source() << "," << msg.source_view_id() << ","
59 << msg.seq() << ")");
60 }
61
operator <<(ostream & os,const ViewTrace & vtr)62 ostream& gcomm::operator<<(ostream& os, const ViewTrace& vtr)
63 {
64 os << vtr.view() << ": ";
65 copy(vtr.msgs().begin(), vtr.msgs().end(),
66 ostream_iterator<const TraceMsg>(os, " "));
67 return os;
68 }
69
operator <<(ostream & os,const Trace & tr)70 ostream& gcomm::operator<<(ostream& os, const Trace& tr)
71 {
72 os << "trace: \n";
73 os << tr.view_traces();
74 return os;
75 }
76
operator <<(ostream & os,const Channel & ch)77 ostream& gcomm::operator<<(ostream& os, const Channel& ch)
78 {
79 return (os << "(" << ch.latency() << "," << ch.loss() << ")");
80 }
81
operator <<(ostream & os,const Channel * chp)82 ostream& gcomm::operator<<(ostream& os, const Channel* chp)
83 {
84 return (os << *chp);
85 }
86
operator <<(ostream & os,const MatrixElem & me)87 ostream& gcomm::operator<<(ostream& os, const MatrixElem& me)
88 {
89 return (os << "(" << me.ii() << "," << me.jj() << ")");
90 }
91
operator <<(ostream & os,const PropagationMatrix & prop)92 ostream& gcomm::operator<<(ostream& os, const PropagationMatrix& prop)
93 {
94 os << "(";
95 copy(prop.prop_.begin(), prop.prop_.end(),
96 ostream_iterator<const ChannelMap::value_type>(os, ","));
97 os << ")";
98 return os;
99 }
100
101
102
103 class LinkOp
104 {
105 public:
LinkOp(DummyNode & node,ChannelMap & prop)106 LinkOp(DummyNode& node, ChannelMap& prop) :
107 node_(node), prop_(prop) { }
108
operator ()(NodeMap::value_type & l)109 void operator()(NodeMap::value_type& l)
110 {
111 if (NodeMap::key(l) != node_.index())
112 {
113 ChannelMap::iterator ii;
114 gu_trace(ii = prop_.insert_unique(
115 make_pair(MatrixElem(node_.index(),
116 NodeMap::key(l)),
117 new Channel(check_trace_conf()))));
118 gcomm::connect(ChannelMap::value(ii), node_.protos().front());
119 gu_trace(ii = prop_.insert_unique(
120 make_pair(MatrixElem(NodeMap::key(l),
121 node_.index()),
122 new Channel(check_trace_conf()))));
123 gcomm::connect(ChannelMap::value(ii),
124 NodeMap::value(l)->protos().front());
125 }
126 }
127 private:
128 DummyNode& node_;
129 ChannelMap& prop_;
130 };
131
132
133
134 class PropagateOp
135 {
136 public:
PropagateOp(NodeMap & tp)137 PropagateOp(NodeMap& tp) : tp_(tp) { }
138
operator ()(ChannelMap::value_type & vt)139 void operator()(ChannelMap::value_type& vt)
140 {
141 ChannelMsg cmsg(vt.second->get());
142 if (cmsg.rb().len() != 0)
143 {
144 NodeMap::iterator i(tp_.find(vt.first.jj()));
145 gcomm_assert(i != tp_.end());
146 gu_trace(NodeMap::value(i)->protos().front()->handle_up(
147 &tp_, cmsg.rb(),
148 ProtoUpMeta(cmsg.source())));
149 }
150 }
151 private:
152 NodeMap& tp_;
153 };
154
155
156 class ExpireTimersOp
157 {
158 public:
ExpireTimersOp()159 ExpireTimersOp() { }
operator ()(NodeMap::value_type & vt)160 void operator()(NodeMap::value_type& vt)
161 {
162 NodeMap::value(vt)->handle_timers();
163 }
164 };
165
put(const Datagram & rb,const UUID & source)166 void gcomm::Channel::put(const Datagram& rb, const UUID& source)
167 {
168 Datagram dg(rb);
169 // if (dg.is_normalized() == false)
170 // {
171 // dg.normalize();
172 // }
173 queue_.push_back(make_pair(latency_, ChannelMsg(dg, source)));
174 }
175
get()176 ChannelMsg gcomm::Channel::get()
177 {
178 while (queue_.empty() == false)
179 {
180 pair<size_t, ChannelMsg>& p(queue_.front());
181 if (p.first == 0)
182 {
183 // todo: packet loss goes here
184 if (loss() < 1.)
185 {
186 double rnd(double(rand())/double(RAND_MAX));
187 if (loss() < rnd)
188 {
189 queue_.pop_front();
190 return ChannelMsg(Datagram(), UUID::nil());
191 }
192 }
193 ChannelMsg ret(p.second);
194 queue_.pop_front();
195 return ret;
196 }
197 else
198 {
199 --p.first;
200 return ChannelMsg(Datagram(), UUID::nil());
201 }
202 }
203 return ChannelMsg(Datagram(), UUID::nil());
204 }
205
~PropagationMatrix()206 gcomm::PropagationMatrix::~PropagationMatrix()
207 {
208 for_each(prop_.begin(), prop_.end(), ChannelMap::DeleteObject());
209 }
210
insert_tp(DummyNode * t)211 void gcomm::PropagationMatrix::insert_tp(DummyNode* t)
212 {
213 gu_trace(tp_.insert_unique(make_pair(t->index(), t)));
214 for_each(tp_.begin(), tp_.end(), LinkOp(*t, prop_));
215 }
216
217
set_latency(const size_t ii,const size_t jj,const size_t lat)218 void gcomm::PropagationMatrix::set_latency(const size_t ii, const size_t jj,
219 const size_t lat)
220 {
221 ChannelMap::iterator i;
222 gu_trace(i = prop_.find_checked(MatrixElem(ii, jj)));
223 ChannelMap::value(i)->set_latency(lat);
224 }
225
226
set_loss(const size_t ii,const size_t jj,const double loss)227 void gcomm::PropagationMatrix::set_loss(const size_t ii, const size_t jj,
228 const double loss)
229 {
230 ChannelMap::iterator i;
231 gu_trace(i = prop_.find_checked(MatrixElem(ii, jj)));
232 ChannelMap::value(i)->set_loss(loss);
233 }
234
split(const size_t ii,const size_t jj)235 void gcomm::PropagationMatrix::split(const size_t ii, const size_t jj)
236 {
237 set_loss(ii, jj, 0.);
238 set_loss(jj, ii, 0.);
239 }
240
241
merge(const size_t ii,const size_t jj,const double loss)242 void gcomm::PropagationMatrix::merge(const size_t ii, const size_t jj, const double loss)
243 {
244 set_loss(ii, jj, loss);
245 set_loss(jj, ii, loss);
246 }
247
248
expire_timers()249 void gcomm::PropagationMatrix::expire_timers()
250 {
251 for_each(tp_.begin(), tp_.end(), ExpireTimersOp());
252 }
253
254
propagate_n(size_t n)255 void gcomm::PropagationMatrix::propagate_n(size_t n)
256 {
257 while (n-- > 0)
258 {
259 for_each(prop_.begin(), prop_.end(), PropagateOp(tp_));
260 }
261 }
262
263
propagate_until_empty()264 void gcomm::PropagationMatrix::propagate_until_empty()
265 {
266 do
267 {
268 for_each(prop_.begin(), prop_.end(), PropagateOp(tp_));
269 }
270 while (count_channel_msgs() > 0);
271 }
272
273
propagate_until_cvi(bool handle_timers)274 void gcomm::PropagationMatrix::propagate_until_cvi(bool handle_timers)
275 {
276 bool all_in = false;
277 do
278 {
279 propagate_n(10);
280 all_in = all_in_cvi();
281 if (all_in == false && handle_timers == true)
282 {
283 expire_timers();
284 }
285 if (handle_timers)
286 {
287 // Assume that time progresses in 50 millisecond intervals
288 // and that is fine enough granularity for all tests
289 // which deal with timers.
290 gu::datetime::SimClock::inc_time(50*gu::datetime::MSec);
291 }
292 }
293 while (all_in == false);
294 }
295
296
count_channel_msgs() const297 size_t gcomm::PropagationMatrix::count_channel_msgs() const
298 {
299 size_t ret = 0;
300 for (ChannelMap::const_iterator i = prop_.begin();
301 i != prop_.end(); ++i)
302 {
303 ret += ChannelMap::value(i)->n_msgs();
304 }
305 return ret;
306 }
307
308
all_in_cvi() const309 bool gcomm::PropagationMatrix::all_in_cvi() const
310 {
311 for (std::map<size_t, DummyNode*>::const_iterator i = tp_.begin();
312 i != tp_.end(); ++i)
313 {
314 if (i->second->in_cvi() == false)
315 {
316 return false;
317 }
318 }
319 return true;
320 }
321
check_traces(const Trace & t1,const Trace & t2)322 static void check_traces(const Trace& t1, const Trace& t2)
323 {
324 for (Trace::ViewTraceMap::const_iterator i = t1.view_traces().begin();
325 i != t1.view_traces().end(); ++i)
326 {
327 Trace::ViewTraceMap::const_iterator j =
328 t2.view_traces().find(Trace::ViewTraceMap::key(i));
329 if (j == t2.view_traces().end()) continue;
330
331 ViewType type = i->first.type();
332 // @todo Proper checks for PRIM and NON_PRIM
333 if (type == V_TRANS || type == V_REG) {
334 Trace::ViewTraceMap::const_iterator i_next(i); ++i_next;
335 Trace::ViewTraceMap::const_iterator j_next(j); ++j_next;
336
337 if (type == V_TRANS) {
338 // if next reg view is same, then views and msgs are the same.
339 if (i_next != t1.view_traces().end() &&
340 j_next != t2.view_traces().end() &&
341 i_next->first == j_next->first) {
342 gcomm_assert(*i == *j) <<
343 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
344 "next views: \n\n" << *i_next << "\n\n" << *j_next;
345 }
346 }
347
348 if (type == V_REG) {
349 // members are same all the times.
350 gcomm_assert(i->second.view().members() == j->second.view().members()) <<
351 "trace differ: \n\n" << *i << "\n\n" << *j;
352
353 // if next trans view has same members, then msgs are the same.
354 if (i_next != t1.view_traces().end() &&
355 j_next != t2.view_traces().end()) {
356 if (i_next->second.view().members() ==
357 j_next->second.view().members()) {
358 gcomm_assert(i->second.msgs() == j->second.msgs()) <<
359 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
360 "next views: \n\n" << *i_next << "\n\n" << *j_next;
361 } else {
362 // if not, then members should be disjoint.
363 std::map<gcomm::UUID, gcomm::Node> output;
364 std::set_intersection(i_next->second.view().members().begin(),
365 i_next->second.view().members().end(),
366 j_next->second.view().members().begin(),
367 j_next->second.view().members().end(),
368 std::inserter(output,output.begin()));
369 gcomm_assert(output.size() == 0) <<
370 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
371 "next views: \n\n" << *i_next << "\n\n" << *j_next;
372 }
373 }
374 // if previous trans view id is the same.
375 // the reg view should be the same.
376
377 // if previous trans view id is not same.
378 // intersections of joined, left, partitioned sets are empty.
379
380 if (i == t1.view_traces().begin() ||
381 j == t2.view_traces().begin()) continue;
382 Trace::ViewTraceMap::const_iterator i_prev(i); --i_prev;
383 Trace::ViewTraceMap::const_iterator j_prev(j); --j_prev;
384
385 if (i_prev->first == j_prev->first) {
386 gcomm_assert(i->second.view() == j->second.view()) <<
387 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
388 "previous views: \n\n" << *i_prev << "\n\n" << *j_prev;
389 } else {
390 std::map<gcomm::UUID, gcomm::Node> output;
391 int joined_size = 0, left_size = 0, part_size = 0;
392 std::set_intersection(i->second.view().joined().begin(),
393 i->second.view().joined().end(),
394 j->second.view().joined().begin(),
395 j->second.view().joined().end(),
396 std::inserter(output, output.begin()));
397 joined_size = output.size();
398 output.clear();
399
400 std::set_intersection(i->second.view().left().begin(),
401 i->second.view().left().end(),
402 j->second.view().left().begin(),
403 j->second.view().left().end(),
404 std::inserter(output, output.begin()));
405 left_size = output.size();
406 output.clear();
407
408 std::set_intersection(i->second.view().partitioned().begin(),
409 i->second.view().partitioned().end(),
410 j->second.view().partitioned().begin(),
411 j->second.view().partitioned().end(),
412 std::inserter(output, output.begin()));
413 part_size = output.size();
414 output.clear();
415
416 gcomm_assert(i->second.view().members() == j->second.view().members() &&
417 joined_size == 0 && left_size == 0 && part_size == 0) <<
418 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
419 "previous views: \n\n" << *i_prev << "\n\n" << *j_prev;
420 }
421 }
422 }
423 }
424 }
425
426 class CheckTraceOp
427 {
428 public:
CheckTraceOp(const vector<DummyNode * > & nvec)429 CheckTraceOp(const vector<DummyNode*>& nvec) : nvec_(nvec) { }
430
operator ()(const DummyNode * n) const431 void operator()(const DummyNode* n) const
432 {
433 for (vector<DummyNode*>::const_iterator i = nvec_.begin();
434 i != nvec_.end();
435 ++i)
436 {
437 if ((*i)->index() != n->index())
438 {
439 gu_trace(check_traces((*i)->trace(), n->trace()));
440 }
441 }
442 }
443 private:
444 const vector<DummyNode*>& nvec_;
445 };
446
447
check_trace(const vector<DummyNode * > & nvec)448 void gcomm::check_trace(const vector<DummyNode*>& nvec)
449 {
450 for_each(nvec.begin(), nvec.end(), CheckTraceOp(nvec));
451 }
452