1 /*
2  * Copyright (C) 2009-2014 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 /*!
8  * Classes for tracing views and messages
9  */
10 
11 #include "gu_uri.hpp"
12 #include "gu_datetime.hpp"
13 
14 #include "gcomm/datagram.hpp"
15 #include "gcomm/uuid.hpp"
16 #include "gcomm/protolay.hpp"
17 #include "gcomm/protostack.hpp"
18 #include "gcomm/transport.hpp"
19 #include "gcomm/map.hpp"
20 #include "gcomm/util.hpp"
21 
22 #include <vector>
23 #include <deque>
24 #include <functional>
25 
26 gu::Config& check_trace_conf();
27 
28 extern "C" void check_trace_log_cb(int, const char*);
29 
30 namespace gcomm
31 {
32     class TraceMsg
33     {
34     public:
TraceMsg(const UUID & source=UUID::nil (),const ViewId & source_view_id=ViewId (),const int64_t seq=-1)35         TraceMsg(const UUID& source           = UUID::nil(),
36                  const ViewId& source_view_id = ViewId(),
37                  const int64_t seq            = -1) :
38             source_(source),
39             source_view_id_(source_view_id),
40             seq_(seq)
41         { }
42 
source() const43         const UUID& source() const { return source_; }
44 
source_view_id() const45         const ViewId& source_view_id() const { return source_view_id_; }
46 
seq() const47         int64_t seq() const { return seq_; }
48 
operator ==(const TraceMsg & cmp) const49         bool operator==(const TraceMsg& cmp) const
50         {
51             return (source_         == cmp.source_         &&
52                     source_view_id_ == cmp.source_view_id_ &&
53                     seq_            == cmp.seq_              );
54 
55         }
56 
57     private:
58         UUID    source_;
59         ViewId  source_view_id_;
60         int64_t seq_;
61     };
62 
63     std::ostream& operator<<(std::ostream& os, const TraceMsg& msg);
64 
65     class ViewTrace
66     {
67     public:
ViewTrace(const View & view)68         ViewTrace(const View& view) : view_(view), msgs_() { }
69 
insert_msg(const TraceMsg & msg)70         void insert_msg(const TraceMsg& msg)
71         {
72             switch (view_.type())
73             {
74             case V_REG:
75                 gcomm_assert(view_.id() == msg.source_view_id());
76                 gcomm_assert(contains(msg.source()) == true)
77                     << "msg source " << msg.source() << " not int view "
78                     << view_;
79                 break;
80             case V_TRANS:
81                 gcomm_assert(view_.id().uuid() ==
82                              msg.source_view_id().uuid() &&
83                              view_.id().seq() ==
84                              msg.source_view_id().seq());
85                 break;
86             case V_NON_PRIM:
87                 break;
88             case V_PRIM:
89                 gcomm_assert(view_.id() == msg.source_view_id())
90                     << " view id " << view_.id()
91                     <<  " source view " << msg.source_view_id();
92                 gcomm_assert(contains(msg.source()) == true);
93                 break;
94             case V_NONE:
95                 gu_throw_fatal;
96                 break;
97             }
98 
99             if (view_.type() != V_NON_PRIM)
100             {
101                 msgs_.push_back(msg);
102             }
103         }
104 
view() const105         const View& view() const { return view_; }
106 
msgs() const107         const std::deque<TraceMsg>& msgs() const { return msgs_; }
108 
operator ==(const ViewTrace & cmp) const109         bool operator==(const ViewTrace& cmp) const
110         {
111             // Note: Cannot compare joining members since seen differently
112             // on different merging subsets
113             return (view_.members()     == cmp.view_.members()     &&
114                     view_.left()        == cmp.view_.left()        &&
115                     view_.partitioned() == cmp.view_.partitioned() &&
116                     msgs_              == cmp.msgs_                     );
117         }
118     private:
119 
contains(const UUID & uuid) const120         bool contains(const UUID& uuid) const
121         {
122             return (view_.members().find(uuid) != view_.members().end() ||
123                     view_.left().find(uuid)    != view_.left().end() ||
124                     view_.partitioned().find(uuid) !=view_.partitioned().end());
125         }
126 
127         View       view_;
128         std::deque<TraceMsg> msgs_;
129     };
130 
131     std::ostream& operator<<(std::ostream& os, const ViewTrace& vtr);
132 
133 
134     class Trace
135     {
136     public:
137         class ViewTraceMap : public Map<ViewId, ViewTrace> { };
138 
Trace()139         Trace() : views_(), current_view_(views_.end()) { }
140 
insert_view(const View & view)141         void insert_view(const View& view)
142         {
143             gu_trace(current_view_ = views_.insert_unique(
144                          std::make_pair(view.id(), ViewTrace(view))));
145 
146             log_debug << view;
147         }
insert_msg(const TraceMsg & msg)148         void insert_msg(const TraceMsg& msg)
149         {
150             gcomm_assert(current_view_ != views_.end())
151                 << "no view set before msg delivery";
152             gu_trace(ViewTraceMap::value(current_view_).insert_msg(msg));
153         }
view_traces() const154         const ViewTraceMap& view_traces() const { return views_; }
155 
current_view_trace() const156         const ViewTrace& current_view_trace() const
157         {
158             gcomm_assert(current_view_ != views_.end());
159             return ViewTraceMap::value(current_view_);
160         }
161 
162     private:
163         ViewTraceMap views_;
164         ViewTraceMap::iterator current_view_;
165     };
166 
167 
168     std::ostream& operator<<(std::ostream& os, const Trace& tr);
169 
170     class DummyTransport : public Transport
171     {
172         UUID uuid_;
173         std::deque<Datagram*> out_;
174         bool queue_;
175 
176     public:
177 
DummyTransport(const UUID & uuid=UUID::nil (),bool queue=true,const gu::URI & uri=gu::URI ("dummy:"))178         DummyTransport(const UUID& uuid = UUID::nil(), bool queue = true,
179                        const gu::URI& uri = gu::URI("dummy:")) :
180             Transport(*std::auto_ptr<Protonet>
181                       (Protonet::create(check_trace_conf())), uri),
182             uuid_(uuid),
183             out_(),
184             queue_(queue)
185         {}
186 
~DummyTransport()187         ~DummyTransport()
188         {
189             out_.clear();
190         }
191 
uuid() const192         const UUID& uuid() const { return uuid_; }
193 
mtu() const194         size_t mtu() const { return (1U << 31); }
195 
connect(bool first)196         void connect(bool first) { }
197 
close(bool force)198         void close(bool force) { }
close(const UUID &)199         void close(const UUID&) { }
200 
connect()201         void connect() { }
202 
listen()203         void listen()
204         {
205             gu_throw_fatal << "not implemented";
206         }
207 
accept()208         Transport *accept()
209         {
210             gu_throw_fatal << "not implemented";
211             return 0;
212         }
213 
handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)214         void handle_up(const void* cid, const Datagram& rb,
215                        const ProtoUpMeta& um)
216         {
217             send_up(rb, um);
218         }
219 
set_queueing(bool val)220         void set_queueing(bool val) { queue_ = val; }
221 
handle_down(Datagram & wb,const ProtoDownMeta & dm)222         int handle_down(Datagram& wb, const ProtoDownMeta& dm)
223         {
224             if (queue_ == true)
225             {
226                 // assert(wb.header().size() == 0);
227                 out_.push_back(new Datagram(wb));
228                 return 0;
229             }
230             else
231             {
232                 gu_trace(return send_down(wb, ProtoDownMeta(0xff, O_UNRELIABLE, uuid_)));
233             }
234         }
235 
empty() const236         bool empty() const { return out_.empty(); }
237 
out()238         Datagram* out()
239         {
240             if (out_.empty())
241             {
242                 return 0;
243             }
244             Datagram* rb = out_.front();
245             out_.pop_front();
246             return rb;
247         }
248     };
249 
250 
251     class DummyNode : public Toplay
252     {
253     public:
DummyNode(gu::Config & conf,const size_t index,const std::list<Protolay * > & protos)254         DummyNode(gu::Config& conf,
255                   const size_t index,
256                   const std::list<Protolay*>& protos) :
257             Toplay (conf),
258             index_  (index),
259             uuid_   (UUID(static_cast<int32_t>(index))),
260             protos_ (protos),
261             cvi_    (),
262             tr_     (),
263             curr_seq_(0)
264         {
265             gcomm_assert(protos_.empty() == false);
266             std::list<Protolay*>::iterator i, i_next;
267             i = i_next = protos_.begin();
268             for (++i_next; i_next != protos_.end(); ++i, ++i_next)
269             {
270                 gu_trace(gcomm::connect(*i, *i_next));
271             }
272             gu_trace(gcomm::connect(*i, this));
273         }
274 
~DummyNode()275         ~DummyNode()
276         {
277             try
278             {
279                 std::list<Protolay*>::iterator i, i_next;
280                 i = i_next = protos_.begin();
281                 for (++i_next; i_next != protos_.end(); ++i, ++i_next)
282                 {
283                     gu_trace(gcomm::disconnect(*i, *i_next));
284                 }
285                 gu_trace(gcomm::disconnect(*i, this));
286                 std::for_each(protos_.begin(), protos_.end(), gu::DeleteObject());
287             }
288             catch(std::exception& e)
289             {
290                 log_fatal << e.what();
291                 abort();
292             }
293         }
294 
295 
uuid() const296         const UUID& uuid() const { return uuid_; }
297 
protos()298         std::list<Protolay*>& protos() { return protos_; }
299 
index() const300         size_t index() const { return index_; }
301 
connect(bool first)302         void connect(bool first)
303         {
304             gu_trace(std::for_each(protos_.rbegin(), protos_.rend(),
305                                    std::bind2nd(
306                                        std::mem_fun(&Protolay::connect), first)));
307         }
308 
close(bool force=false)309         void close(bool force = false)
310         {
311             for (std::list<Protolay*>::iterator i = protos_.begin();
312                  i != protos_.end(); ++i)
313             {
314                 (*i)->close();
315             }
316             // gu_trace(std::for_each(protos.rbegin(), protos.rend(),
317             //                       std::mem_fun(&Protolay::close)));
318         }
319 
320 
close(const UUID & uuid)321         void close(const UUID& uuid)
322         {
323             for (std::list<Protolay*>::iterator i = protos_.begin();
324                  i != protos_.end(); ++i)
325             {
326                 (*i)->close(uuid);
327             }
328             // gu_trace(std::for_each(protos.rbegin(), protos.rend(),
329             //                       std::mem_fun(&Protolay::close)));
330         }
331 
send()332         void send()
333         {
334             const int64_t seq(curr_seq_);
335             gu::byte_t buf[sizeof(seq)];
336             size_t sz;
337             gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0));
338             Datagram dg(gu::Buffer(buf, buf + sz));
339             int err = send_down(dg, ProtoDownMeta(0));
340             if (err != 0)
341             {
342                 log_debug << "failed to send: " << strerror(err);
343             }
344             else
345             {
346                 ++curr_seq_;
347             }
348         }
349 
create_datagram()350         Datagram create_datagram()
351         {
352             const int64_t seq(curr_seq_);
353             gu::byte_t buf[sizeof(seq)];
354             size_t sz;
355             gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0));
356             return Datagram (gu::Buffer(buf, buf + sz));
357         }
358 
trace() const359         const Trace& trace() const { return tr_; }
360 
set_cvi(const ViewId & vi)361         void set_cvi(const ViewId& vi)
362         {
363             log_debug << uuid() << " setting cvi to " << vi;
364             cvi_ = vi;
365         }
366 
in_cvi() const367         bool in_cvi() const
368         {
369             for (Trace::ViewTraceMap::const_reverse_iterator i(
370                      tr_.view_traces().rbegin());
371                  i != tr_.view_traces().rend(); ++i)
372             {
373                 if (i->first.uuid() == cvi_.uuid() &&
374                     i->first.type() == cvi_.type() &&
375                     i->first.seq()  >= cvi_.seq())
376                 {
377                     return true;
378                 }
379             }
380             return false;
381         }
382 
handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)383         void handle_up(const void* cid, const Datagram& rb,
384                        const ProtoUpMeta& um)
385         {
386             if (rb.len() != 0)
387             {
388                 gcomm_assert((um.source() == UUID::nil()) == false);
389                 // assert(rb.header().size() == 0);
390                 const gu::byte_t* begin(gcomm::begin(rb));
391                 const size_t available(gcomm::available(rb));
392 
393 
394                 // log_debug << um.source() << " " << uuid()
395                 //         << " " << available ;
396                 // log_debug << rb.len() << " " << rb.offset() << " "
397                 //         << rb.header_len();
398                 if (available != 8)
399                 {
400                     log_info << "check_trace fail: " << available;
401                 }
402                 gcomm_assert(available == 8);
403                 int64_t seq;
404                 gu_trace(gu::unserialize8(begin,
405                                           available,
406                                           0,
407                                           seq));
408                 tr_.insert_msg(TraceMsg(um.source(), um.source_view_id(),
409                                         seq));
410             }
411             else
412             {
413                 gcomm_assert(um.has_view() == true);
414                 tr_.insert_view(um.view());
415             }
416         }
417 
418 
handle_timers()419         gu::datetime::Date handle_timers()
420         {
421             std::for_each(protos_.begin(), protos_.end(),
422                           std::mem_fun(&Protolay::handle_timers));
423             return gu::datetime::Date::max();
424         }
425 
426     private:
427         size_t index_;
428         UUID uuid_;
429         std::list<Protolay*> protos_;
430         ViewId cvi_;
431         Trace tr_;
432         int64_t curr_seq_;
433     };
434 
435 
436 
437     class ChannelMsg
438     {
439     public:
ChannelMsg(const Datagram & rb,const UUID & source)440         ChannelMsg(const Datagram& rb, const UUID& source) :
441             rb_(rb),
442             source_(source)
443         {
444         }
rb() const445         const Datagram& rb() const { return rb_; }
source() const446         const UUID& source() const { return source_; }
447     private:
448         Datagram rb_;
449         UUID source_;
450     };
451 
452 
453     class Channel : public Bottomlay
454     {
455     public:
Channel(gu::Config & conf,const size_t ttl=1,const size_t latency=1,const double loss=1.)456         Channel(gu::Config& conf,
457                 const size_t ttl = 1,
458                 const size_t latency = 1,
459                 const double loss = 1.) :
460             Bottomlay(conf),
461             ttl_(ttl),
462             latency_(latency),
463             loss_(loss),
464             queue_()
465         { }
466 
467 
468 
~Channel()469         ~Channel() { }
470 
handle_down(Datagram & wb,const ProtoDownMeta & dm)471         int handle_down(Datagram& wb, const ProtoDownMeta& dm)
472         {
473             gcomm_assert((dm.source() == UUID::nil()) == false);
474             gu_trace(put(wb, dm.source()));
475             return 0;
476         }
477 
478         void put(const Datagram& rb, const UUID& source);
479         ChannelMsg get();
set_ttl(const size_t t)480         void set_ttl(const size_t t) { ttl_ = t; }
ttl() const481         size_t ttl() const { return ttl_; }
set_latency(const size_t l)482         void set_latency(const size_t l)
483         {
484             gcomm_assert(l > 0);
485             latency_ = l;
486         }
latency() const487         size_t latency() const { return latency_; }
set_loss(const double l)488         void set_loss(const double l) { loss_ = l; }
loss() const489         double loss() const { return loss_; }
n_msgs() const490         size_t n_msgs() const
491         {
492             return queue_.size();
493         }
494     private:
495         size_t ttl_;
496         size_t latency_;
497         double loss_;
498         std::deque<std::pair<size_t, ChannelMsg> > queue_;
499     };
500 
501 
502     std::ostream& operator<<(std::ostream& os, const Channel& ch);
503     std::ostream& operator<<(std::ostream& os, const Channel* ch);
504 
505 
506 
507 
508     class MatrixElem
509     {
510     public:
MatrixElem(const size_t ii,const size_t jj)511         MatrixElem(const size_t ii, const size_t jj) : ii_(ii), jj_(jj) { }
ii() const512         size_t ii() const { return ii_; }
jj() const513         size_t jj() const { return jj_; }
operator <(const MatrixElem & cmp) const514         bool operator<(const MatrixElem& cmp) const
515         {
516             return (ii_ < cmp.ii_ || (ii_ == cmp.ii_ && jj_ < cmp.jj_));
517         }
518     private:
519         size_t ii_;
520         size_t jj_;
521     };
522 
523     std::ostream& operator<<(std::ostream& os, const MatrixElem& me);
524 
525     class ChannelMap : public  Map<MatrixElem, Channel*>
526     {
527     public:
528         struct DeleteObject
529         {
operator ()gcomm::ChannelMap::DeleteObject530             void operator()(ChannelMap::value_type& vt)
531             {
532                 delete ChannelMap::value(vt);
533             }
534         };
535     };
536     class NodeMap : public Map<size_t, DummyNode*> {
537     public:
538         struct DeleteObject
539         {
operator ()gcomm::NodeMap::DeleteObject540             void operator()(NodeMap::value_type& vt)
541             {
542                 delete NodeMap::value(vt);
543             }
544         };
545 
546     };
547 
548     class PropagationMatrix
549     {
550     public:
PropagationMatrix()551         PropagationMatrix() : tp_(), prop_()
552         {
553             // Some tests which deal with timer expiration require that
554             // the current time is far enough from zero. Start from
555             // 100 secs after zero, this should give enough headroom
556             // for all tests.
557             gu::datetime::SimClock::init(100*gu::datetime::Sec);
558             // Uncomment this to get logs with simulated timestamps.
559             // The low will be written into stderr.
560             // gu_log_cb = check_trace_log_cb;
561         }
562         ~PropagationMatrix();
563 
564         void insert_tp(DummyNode* t);
565         void set_latency(const size_t ii, const size_t jj, const size_t lat);
566         void set_loss(const size_t ii, const size_t jj, const double loss);
567         void split(const size_t ii, const size_t jj);
568         void merge(const size_t ii, const size_t jj, const double loss = 1.0);
569         void propagate_n(size_t n);
570         void propagate_until_empty();
571         void propagate_until_cvi(bool handle_timers);
572         friend std::ostream& operator<<(std::ostream&,const PropagationMatrix&);
573     private:
574         void expire_timers();
575 
576 
577         size_t count_channel_msgs() const;
578         bool all_in_cvi() const;
579 
580         NodeMap    tp_;
581         ChannelMap prop_;
582     };
583 
584 
585     std::ostream& operator<<(std::ostream& os, const PropagationMatrix& prop);
586 
587     // Cross check traces from vector of dummy nodes
588     void check_trace(const std::vector<DummyNode*>& nvec);
589 
590 } // namespace gcomm
591