1 /*
2  * Copyright (C) 2009-2014 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 /*!
8  * @brief Check trace implementation
9  */
10 
11 #include "check_trace.hpp"
12 #include "gcomm/conf.hpp"
13 #include "gu_asio.hpp" // gu::ssl_register_params()
14 
15 using namespace std;
16 using namespace gu;
17 using namespace gcomm;
18 
19 struct CheckTraceConfInit
20 {
CheckTraceConfInitCheckTraceConfInit21     explicit CheckTraceConfInit(gu::Config& conf)
22     {
23         gu::ssl_register_params(conf);
24         gcomm::Conf::register_params(conf);
25     }
26 };
27 
check_trace_log_cb(int severity,const char * msg)28 extern "C" void check_trace_log_cb(int severity, const char* msg)
29 {
30     std::cerr << gu::datetime::Date::monotonic() << ": " << msg << "\n";
31 }
32 
33 
34 // This is to avoid static initialization fiasco with gcomm::Conf static members
35 // Ideally it is the latter which should be wrapped in a function, but, unless
36 // this is used to initialize another static object, it should be fine.
check_trace_conf()37 gu::Config& check_trace_conf()
38 {
39     static gu::Config conf;
40     static CheckTraceConfInit const check_trace_conf_init(conf);
41 
42     return conf;
43 }
44 
45 std::unique_ptr<Protonet> DummyTransport::net_;
get_net()46 Protonet& DummyTransport::get_net()
47 {
48     // Unit tests are single threaded, no need to worry about thread
49     // synchronization here.
50     if (not net_)
51         net_ = std::unique_ptr<Protonet>(Protonet::create(check_trace_conf()));;
52     return *net_;
53 }
54 
55 
operator <<(ostream & os,const TraceMsg & msg)56 ostream& gcomm::operator<<(ostream& os, const TraceMsg& msg)
57 {
58     return (os << "(" << msg.source() << "," << msg.source_view_id() << ","
59             << msg.seq() << ")");
60 }
61 
operator <<(ostream & os,const ViewTrace & vtr)62 ostream& gcomm::operator<<(ostream& os, const ViewTrace& vtr)
63 {
64     os << vtr.view() << ": ";
65     copy(vtr.msgs().begin(), vtr.msgs().end(),
66          ostream_iterator<const TraceMsg>(os, " "));
67     return os;
68 }
69 
operator <<(ostream & os,const Trace & tr)70 ostream& gcomm::operator<<(ostream& os, const Trace& tr)
71 {
72     os << "trace: \n";
73     os << tr.view_traces();
74     return os;
75 }
76 
operator <<(ostream & os,const Channel & ch)77 ostream& gcomm::operator<<(ostream& os, const Channel& ch)
78 {
79     return (os << "(" << ch.latency() << "," << ch.loss() << ")");
80 }
81 
operator <<(ostream & os,const Channel * chp)82 ostream& gcomm::operator<<(ostream& os, const Channel* chp)
83 {
84     return (os << *chp);
85 }
86 
operator <<(ostream & os,const MatrixElem & me)87 ostream& gcomm::operator<<(ostream& os, const MatrixElem& me)
88 {
89     return (os << "(" << me.ii() << "," << me.jj() << ")");
90 }
91 
operator <<(ostream & os,const PropagationMatrix & prop)92 ostream& gcomm::operator<<(ostream& os, const PropagationMatrix& prop)
93 {
94     os << "(";
95     copy(prop.prop_.begin(), prop.prop_.end(),
96          ostream_iterator<const ChannelMap::value_type>(os, ","));
97     os << ")";
98     return os;
99 }
100 
101 
102 
103 class LinkOp
104 {
105 public:
LinkOp(DummyNode & node,ChannelMap & prop)106     LinkOp(DummyNode& node, ChannelMap& prop) :
107         node_(node), prop_(prop) { }
108 
operator ()(NodeMap::value_type & l)109     void operator()(NodeMap::value_type& l)
110     {
111         if (NodeMap::key(l) != node_.index())
112         {
113             ChannelMap::iterator ii;
114             gu_trace(ii = prop_.insert_unique(
115                          make_pair(MatrixElem(node_.index(),
116                                               NodeMap::key(l)),
117                                    new Channel(check_trace_conf()))));
118             gcomm::connect(ChannelMap::value(ii), node_.protos().front());
119             gu_trace(ii = prop_.insert_unique(
120                          make_pair(MatrixElem(NodeMap::key(l),
121                                               node_.index()),
122                                    new Channel(check_trace_conf()))));
123             gcomm::connect(ChannelMap::value(ii),
124                            NodeMap::value(l)->protos().front());
125         }
126     }
127 private:
128     DummyNode& node_;
129     ChannelMap& prop_;
130 };
131 
132 
133 
134 class PropagateOp
135 {
136 public:
PropagateOp(NodeMap & tp)137     PropagateOp(NodeMap& tp) : tp_(tp) { }
138 
operator ()(ChannelMap::value_type & vt)139     void operator()(ChannelMap::value_type& vt)
140     {
141         ChannelMsg cmsg(vt.second->get());
142         if (cmsg.rb().len() != 0)
143         {
144             NodeMap::iterator i(tp_.find(vt.first.jj()));
145             gcomm_assert(i != tp_.end());
146             gu_trace(NodeMap::value(i)->protos().front()->handle_up(
147                          &tp_, cmsg.rb(),
148                          ProtoUpMeta(cmsg.source())));
149         }
150     }
151 private:
152     NodeMap& tp_;
153 };
154 
155 
156 class ExpireTimersOp
157 {
158 public:
ExpireTimersOp()159     ExpireTimersOp() { }
operator ()(NodeMap::value_type & vt)160     void operator()(NodeMap::value_type& vt)
161     {
162         NodeMap::value(vt)->handle_timers();
163     }
164 };
165 
put(const Datagram & rb,const UUID & source)166 void gcomm::Channel::put(const Datagram& rb, const UUID& source)
167 {
168     Datagram dg(rb);
169 //    if (dg.is_normalized() == false)
170     //  {
171     //   dg.normalize();
172     // }
173     queue_.push_back(make_pair(latency_, ChannelMsg(dg, source)));
174 }
175 
get()176 ChannelMsg gcomm::Channel::get()
177 {
178     while (queue_.empty() == false)
179     {
180         pair<size_t, ChannelMsg>& p(queue_.front());
181         if (p.first == 0)
182         {
183             // todo: packet loss goes here
184             if (loss() < 1.)
185             {
186                 double rnd(double(rand())/double(RAND_MAX));
187                 if (loss() < rnd)
188                 {
189                     queue_.pop_front();
190                     return ChannelMsg(Datagram(), UUID::nil());
191                 }
192             }
193             ChannelMsg ret(p.second);
194             queue_.pop_front();
195             return ret;
196         }
197         else
198         {
199             --p.first;
200             return ChannelMsg(Datagram(), UUID::nil());
201         }
202     }
203     return ChannelMsg(Datagram(), UUID::nil());
204 }
205 
~PropagationMatrix()206 gcomm::PropagationMatrix::~PropagationMatrix()
207 {
208     for_each(prop_.begin(), prop_.end(), ChannelMap::DeleteObject());
209 }
210 
insert_tp(DummyNode * t)211 void gcomm::PropagationMatrix::insert_tp(DummyNode* t)
212 {
213     gu_trace(tp_.insert_unique(make_pair(t->index(), t)));
214     for_each(tp_.begin(), tp_.end(), LinkOp(*t, prop_));
215 }
216 
217 
set_latency(const size_t ii,const size_t jj,const size_t lat)218 void gcomm::PropagationMatrix::set_latency(const size_t ii, const size_t jj,
219                                            const size_t lat)
220 {
221     ChannelMap::iterator i;
222     gu_trace(i = prop_.find_checked(MatrixElem(ii, jj)));
223     ChannelMap::value(i)->set_latency(lat);
224 }
225 
226 
set_loss(const size_t ii,const size_t jj,const double loss)227 void gcomm::PropagationMatrix::set_loss(const size_t ii, const size_t jj,
228                                         const double loss)
229 {
230     ChannelMap::iterator i;
231     gu_trace(i = prop_.find_checked(MatrixElem(ii, jj)));
232     ChannelMap::value(i)->set_loss(loss);
233 }
234 
split(const size_t ii,const size_t jj)235 void gcomm::PropagationMatrix::split(const size_t ii, const size_t jj)
236 {
237     set_loss(ii, jj, 0.);
238     set_loss(jj, ii, 0.);
239 }
240 
241 
merge(const size_t ii,const size_t jj,const double loss)242 void gcomm::PropagationMatrix::merge(const size_t ii, const size_t jj, const double loss)
243 {
244     set_loss(ii, jj, loss);
245     set_loss(jj, ii, loss);
246 }
247 
248 
expire_timers()249 void gcomm::PropagationMatrix::expire_timers()
250 {
251     for_each(tp_.begin(), tp_.end(), ExpireTimersOp());
252 }
253 
254 
propagate_n(size_t n)255 void gcomm::PropagationMatrix::propagate_n(size_t n)
256 {
257     while (n-- > 0)
258     {
259         for_each(prop_.begin(), prop_.end(), PropagateOp(tp_));
260     }
261 }
262 
263 
propagate_until_empty()264 void gcomm::PropagationMatrix::propagate_until_empty()
265 {
266     do
267     {
268         for_each(prop_.begin(), prop_.end(), PropagateOp(tp_));
269     }
270     while (count_channel_msgs() > 0);
271 }
272 
273 
propagate_until_cvi(bool handle_timers)274 void gcomm::PropagationMatrix::propagate_until_cvi(bool handle_timers)
275 {
276     bool all_in = false;
277     do
278     {
279         propagate_n(10);
280         all_in = all_in_cvi();
281         if (all_in == false && handle_timers == true)
282         {
283             expire_timers();
284         }
285         if (handle_timers)
286         {
287             // Assume that time progresses in 50 millisecond intervals
288             // and that is fine enough granularity for all tests
289             // which deal with timers.
290             gu::datetime::SimClock::inc_time(50*gu::datetime::MSec);
291         }
292     }
293     while (all_in == false);
294 }
295 
296 
count_channel_msgs() const297 size_t gcomm::PropagationMatrix::count_channel_msgs() const
298 {
299     size_t ret = 0;
300     for (ChannelMap::const_iterator i = prop_.begin();
301          i != prop_.end(); ++i)
302     {
303         ret += ChannelMap::value(i)->n_msgs();
304     }
305     return ret;
306 }
307 
308 
all_in_cvi() const309 bool gcomm::PropagationMatrix::all_in_cvi() const
310 {
311     for (std::map<size_t, DummyNode*>::const_iterator i = tp_.begin();
312          i != tp_.end(); ++i)
313     {
314         if (i->second->in_cvi() == false)
315         {
316             return false;
317         }
318     }
319     return true;
320 }
321 
check_traces(const Trace & t1,const Trace & t2)322 static void check_traces(const Trace& t1, const Trace& t2)
323 {
324     for (Trace::ViewTraceMap::const_iterator i = t1.view_traces().begin();
325          i != t1.view_traces().end();  ++i)
326     {
327         Trace::ViewTraceMap::const_iterator j =
328                 t2.view_traces().find(Trace::ViewTraceMap::key(i));
329         if (j == t2.view_traces().end()) continue;
330 
331         ViewType type = i->first.type();
332         // @todo Proper checks for PRIM and NON_PRIM
333         if (type == V_TRANS || type == V_REG) {
334             Trace::ViewTraceMap::const_iterator i_next(i); ++i_next;
335             Trace::ViewTraceMap::const_iterator j_next(j); ++j_next;
336 
337             if (type == V_TRANS) {
338                 // if next reg view is same, then views and msgs are the same.
339                 if (i_next != t1.view_traces().end() &&
340                     j_next != t2.view_traces().end() &&
341                     i_next->first == j_next->first) {
342                     gcomm_assert(*i == *j) <<
343                             "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
344                             "next views: \n\n" << *i_next << "\n\n" << *j_next;
345                 }
346             }
347 
348             if (type == V_REG) {
349                 // members are same all the times.
350                 gcomm_assert(i->second.view().members() == j->second.view().members()) <<
351                         "trace differ: \n\n" << *i << "\n\n" << *j;
352 
353                 // if next trans view has same members, then msgs are the same.
354                 if (i_next != t1.view_traces().end() &&
355                     j_next != t2.view_traces().end()) {
356                     if (i_next->second.view().members() ==
357                         j_next->second.view().members()) {
358                         gcomm_assert(i->second.msgs() == j->second.msgs()) <<
359                                 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
360                                 "next views: \n\n" << *i_next << "\n\n" << *j_next;
361                     } else {
362                         // if not, then members should be disjoint.
363                         std::map<gcomm::UUID, gcomm::Node> output;
364                         std::set_intersection(i_next->second.view().members().begin(),
365                                               i_next->second.view().members().end(),
366                                               j_next->second.view().members().begin(),
367                                               j_next->second.view().members().end(),
368                                               std::inserter(output,output.begin()));
369                         gcomm_assert(output.size() == 0) <<
370                                 "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
371                                 "next views: \n\n" << *i_next << "\n\n" << *j_next;
372                     }
373                 }
374                 // if previous trans view id is the same.
375                 // the reg view should be the same.
376 
377                 // if previous trans view id is not same.
378                 // intersections of joined, left, partitioned sets are empty.
379 
380                 if (i == t1.view_traces().begin() ||
381                     j == t2.view_traces().begin()) continue;
382                 Trace::ViewTraceMap::const_iterator i_prev(i); --i_prev;
383                 Trace::ViewTraceMap::const_iterator j_prev(j); --j_prev;
384 
385                 if (i_prev->first == j_prev->first) {
386                     gcomm_assert(i->second.view() == j->second.view()) <<
387                             "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
388                             "previous views: \n\n" << *i_prev << "\n\n" << *j_prev;
389                 } else {
390                     std::map<gcomm::UUID, gcomm::Node> output;
391                     int joined_size = 0, left_size = 0, part_size = 0;
392                     std::set_intersection(i->second.view().joined().begin(),
393                                           i->second.view().joined().end(),
394                                           j->second.view().joined().begin(),
395                                           j->second.view().joined().end(),
396                                           std::inserter(output, output.begin()));
397                     joined_size = output.size();
398                     output.clear();
399 
400                     std::set_intersection(i->second.view().left().begin(),
401                                           i->second.view().left().end(),
402                                           j->second.view().left().begin(),
403                                           j->second.view().left().end(),
404                                           std::inserter(output, output.begin()));
405                     left_size = output.size();
406                     output.clear();
407 
408                     std::set_intersection(i->second.view().partitioned().begin(),
409                                           i->second.view().partitioned().end(),
410                                           j->second.view().partitioned().begin(),
411                                           j->second.view().partitioned().end(),
412                                           std::inserter(output, output.begin()));
413                     part_size = output.size();
414                     output.clear();
415 
416                     gcomm_assert(i->second.view().members() == j->second.view().members() &&
417                                  joined_size == 0 && left_size == 0 && part_size == 0) <<
418                             "trace differ: \n\n" << *i << "\n\n" << *j << "\n\n"
419                             "previous views: \n\n" << *i_prev << "\n\n" << *j_prev;
420                 }
421             }
422         }
423     }
424 }
425 
426 class CheckTraceOp
427 {
428 public:
CheckTraceOp(const vector<DummyNode * > & nvec)429     CheckTraceOp(const vector<DummyNode*>& nvec) : nvec_(nvec) { }
430 
operator ()(const DummyNode * n) const431     void operator()(const DummyNode* n) const
432     {
433         for (vector<DummyNode*>::const_iterator i = nvec_.begin();
434              i != nvec_.end();
435              ++i)
436         {
437             if ((*i)->index() != n->index())
438             {
439                 gu_trace(check_traces((*i)->trace(), n->trace()));
440             }
441         }
442     }
443 private:
444     const vector<DummyNode*>& nvec_;
445 };
446 
447 
check_trace(const vector<DummyNode * > & nvec)448 void gcomm::check_trace(const vector<DummyNode*>& nvec)
449 {
450     for_each(nvec.begin(), nvec.end(), CheckTraceOp(nvec));
451 }
452