1 /*
2  * Copyright (C) 2009-2014 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 /*!
8  * Classes for tracing views and messages
9  */
10 
11 #include "gu_uri.hpp"
12 #include "gu_datetime.hpp"
13 
14 #include "gcomm/datagram.hpp"
15 #include "gcomm/uuid.hpp"
16 #include "gcomm/protolay.hpp"
17 #include "gcomm/protostack.hpp"
18 #include "gcomm/transport.hpp"
19 #include "gcomm/map.hpp"
20 #include "gcomm/util.hpp"
21 
22 #include <vector>
23 #include <deque>
24 #include <functional>
25 
26 gu::Config& check_trace_conf();
27 
28 extern "C" void check_trace_log_cb(int, const char*);
29 
30 namespace gcomm
31 {
32     class TraceMsg
33     {
34     public:
TraceMsg(const UUID & source=UUID::nil (),const ViewId & source_view_id=ViewId (),const int64_t seq=-1)35         TraceMsg(const UUID& source           = UUID::nil(),
36                  const ViewId& source_view_id = ViewId(),
37                  const int64_t seq            = -1) :
38             source_(source),
39             source_view_id_(source_view_id),
40             seq_(seq)
41         { }
42 
source() const43         const UUID& source() const { return source_; }
44 
source_view_id() const45         const ViewId& source_view_id() const { return source_view_id_; }
46 
seq() const47         int64_t seq() const { return seq_; }
48 
operator ==(const TraceMsg & cmp) const49         bool operator==(const TraceMsg& cmp) const
50         {
51             return (source_         == cmp.source_         &&
52                     source_view_id_ == cmp.source_view_id_ &&
53                     seq_            == cmp.seq_              );
54 
55         }
56 
57     private:
58         UUID    source_;
59         ViewId  source_view_id_;
60         int64_t seq_;
61     };
62 
63     std::ostream& operator<<(std::ostream& os, const TraceMsg& msg);
64 
65     class ViewTrace
66     {
67     public:
ViewTrace(const View & view)68         ViewTrace(const View& view) : view_(view), msgs_() { }
69 
insert_msg(const TraceMsg & msg)70         void insert_msg(const TraceMsg& msg)
71         {
72             switch (view_.type())
73             {
74             case V_REG:
75                 gcomm_assert(view_.id() == msg.source_view_id());
76                 gcomm_assert(contains(msg.source()) == true)
77                     << "msg source " << msg.source() << " not int view "
78                     << view_;
79                 break;
80             case V_TRANS:
81                 gcomm_assert(view_.id().uuid() ==
82                              msg.source_view_id().uuid() &&
83                              view_.id().seq() ==
84                              msg.source_view_id().seq());
85                 break;
86             case V_NON_PRIM:
87                 break;
88             case V_PRIM:
89                 gcomm_assert(view_.id() == msg.source_view_id())
90                     << " view id " << view_.id()
91                     <<  " source view " << msg.source_view_id();
92                 gcomm_assert(contains(msg.source()) == true);
93                 break;
94             case V_NONE:
95                 gu_throw_fatal;
96                 break;
97             }
98 
99             if (view_.type() != V_NON_PRIM)
100             {
101                 msgs_.push_back(msg);
102             }
103         }
104 
view() const105         const View& view() const { return view_; }
106 
msgs() const107         const std::deque<TraceMsg>& msgs() const { return msgs_; }
108 
operator ==(const ViewTrace & cmp) const109         bool operator==(const ViewTrace& cmp) const
110         {
111             // Note: Cannot compare joining members since seen differently
112             // on different merging subsets
113             return (view_.members()     == cmp.view_.members()     &&
114                     view_.left()        == cmp.view_.left()        &&
115                     view_.partitioned() == cmp.view_.partitioned() &&
116                     msgs_              == cmp.msgs_                     );
117         }
118     private:
119 
contains(const UUID & uuid) const120         bool contains(const UUID& uuid) const
121         {
122             return (view_.members().find(uuid) != view_.members().end() ||
123                     view_.left().find(uuid)    != view_.left().end() ||
124                     view_.partitioned().find(uuid) !=view_.partitioned().end());
125         }
126 
127         View       view_;
128         std::deque<TraceMsg> msgs_;
129     };
130 
131     std::ostream& operator<<(std::ostream& os, const ViewTrace& vtr);
132 
133 
134     class Trace
135     {
136     public:
137         class ViewTraceMap : public Map<ViewId, ViewTrace> { };
138 
Trace()139         Trace() : views_(), current_view_(views_.end()) { }
140 
insert_view(const View & view)141         void insert_view(const View& view)
142         {
143             gu_trace(current_view_ = views_.insert_unique(
144                          std::make_pair(view.id(), ViewTrace(view))));
145 
146             log_debug << view;
147         }
insert_msg(const TraceMsg & msg)148         void insert_msg(const TraceMsg& msg)
149         {
150             gcomm_assert(current_view_ != views_.end())
151                 << "no view set before msg delivery";
152             gu_trace(ViewTraceMap::value(current_view_).insert_msg(msg));
153         }
view_traces() const154         const ViewTraceMap& view_traces() const { return views_; }
155 
current_view_trace() const156         const ViewTrace& current_view_trace() const
157         {
158             gcomm_assert(current_view_ != views_.end());
159             return ViewTraceMap::value(current_view_);
160         }
161 
162     private:
163         ViewTraceMap views_;
164         ViewTraceMap::iterator current_view_;
165     };
166 
167 
168     std::ostream& operator<<(std::ostream& os, const Trace& tr);
169 
170     class DummyTransport : public Transport
171     {
172         UUID uuid_;
173         std::deque<Datagram*> out_;
174         bool queue_;
175         static std::unique_ptr<Protonet> net_;
176         Protonet& get_net();
177     public:
178 
DummyTransport(const UUID & uuid=UUID::nil (),bool queue=true,const gu::URI & uri=gu::URI ("dummy:"))179         DummyTransport(const UUID& uuid = UUID::nil(), bool queue = true,
180                        const gu::URI& uri = gu::URI("dummy:")) :
181             Transport(get_net(), uri),
182             uuid_(uuid),
183             out_(),
184             queue_(queue)
185         {}
186 
~DummyTransport()187         ~DummyTransport()
188         {
189             out_.clear();
190         }
191 
uuid() const192         const UUID& uuid() const { return uuid_; }
193 
mtu() const194         size_t mtu() const { return (1U << 31); }
195 
connect(bool first)196         void connect(bool first) { }
197 
close(bool force)198         void close(bool force) { }
close(const UUID &)199         void close(const UUID&) { }
200 
connect()201         void connect() { }
202 
listen()203         void listen()
204         {
205             gu_throw_fatal << "not implemented";
206         }
207 
accept()208         Transport *accept()
209         {
210             gu_throw_fatal << "not implemented";
211             return 0;
212         }
213 
handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)214         void handle_up(const void* cid, const Datagram& rb,
215                        const ProtoUpMeta& um)
216         {
217             send_up(rb, um);
218         }
219 
set_queueing(bool val)220         void set_queueing(bool val) { queue_ = val; }
221 
handle_down(Datagram & wb,const ProtoDownMeta & dm)222         int handle_down(Datagram& wb, const ProtoDownMeta& dm)
223         {
224             if (queue_ == true)
225             {
226                 // assert(wb.header().size() == 0);
227                 out_.push_back(new Datagram(wb));
228                 return 0;
229             }
230             else
231             {
232                 gu_trace(return send_down(wb, ProtoDownMeta(0xff, O_UNRELIABLE, uuid_)));
233             }
234         }
235 
empty() const236         bool empty() const { return out_.empty(); }
237 
out()238         Datagram* out()
239         {
240             if (out_.empty())
241             {
242                 return 0;
243             }
244             Datagram* rb = out_.front();
245             out_.pop_front();
246             return rb;
247         }
248     };
249 
250 
251     class DummyNode : public Toplay
252     {
253     public:
DummyNode(gu::Config & conf,const size_t index,const gcomm::UUID & uuid,const std::list<Protolay * > & protos)254         DummyNode(gu::Config& conf,
255                   const size_t index,
256                   const gcomm::UUID& uuid,
257                   const std::list<Protolay*>& protos) :
258             Toplay (conf),
259             index_  (index),
260             uuid_   (uuid),
261             protos_ (protos),
262             cvi_    (),
263             tr_     (),
264             curr_seq_(0)
265         {
266             gcomm_assert(protos_.empty() == false);
267             std::list<Protolay*>::iterator i, i_next;
268             i = i_next = protos_.begin();
269             for (++i_next; i_next != protos_.end(); ++i, ++i_next)
270             {
271                 gu_trace(gcomm::connect(*i, *i_next));
272             }
273             gu_trace(gcomm::connect(*i, this));
274         }
275 
~DummyNode()276         ~DummyNode()
277         {
278             try
279             {
280                 std::list<Protolay*>::iterator i, i_next;
281                 i = i_next = protos_.begin();
282                 for (++i_next; i_next != protos_.end(); ++i, ++i_next)
283                 {
284                     gu_trace(gcomm::disconnect(*i, *i_next));
285                 }
286                 gu_trace(gcomm::disconnect(*i, this));
287                 std::for_each(protos_.begin(), protos_.end(), gu::DeleteObject());
288             }
289             catch(std::exception& e)
290             {
291                 log_fatal << e.what();
292                 abort();
293             }
294         }
295 
296 
uuid() const297         const UUID& uuid() const { return uuid_; }
298 
protos()299         std::list<Protolay*>& protos() { return protos_; }
300 
index() const301         size_t index() const { return index_; }
302 
connect(bool first)303         void connect(bool first)
304         {
305             gu_trace(std::for_each(protos_.rbegin(), protos_.rend(),
306                                    std::bind2nd(
307                                        std::mem_fun(&Protolay::connect), first)));
308         }
309 
close(bool force=false)310         void close(bool force = false)
311         {
312             for (std::list<Protolay*>::iterator i = protos_.begin();
313                  i != protos_.end(); ++i)
314             {
315                 (*i)->close();
316             }
317             // gu_trace(std::for_each(protos.rbegin(), protos.rend(),
318             //                       std::mem_fun(&Protolay::close)));
319         }
320 
321 
close(const UUID & uuid)322         void close(const UUID& uuid)
323         {
324             for (std::list<Protolay*>::iterator i = protos_.begin();
325                  i != protos_.end(); ++i)
326             {
327                 (*i)->close(uuid);
328             }
329             // gu_trace(std::for_each(protos.rbegin(), protos.rend(),
330             //                       std::mem_fun(&Protolay::close)));
331         }
332 
send()333         void send()
334         {
335             const int64_t seq(curr_seq_);
336             gu::byte_t buf[sizeof(seq)];
337             size_t sz;
338             gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0));
339             Datagram dg(gu::Buffer(buf, buf + sz));
340             int err = send_down(dg, ProtoDownMeta(0));
341             if (err != 0)
342             {
343                 log_debug << "failed to send: " << strerror(err);
344             }
345             else
346             {
347                 ++curr_seq_;
348             }
349         }
350 
create_datagram()351         Datagram create_datagram()
352         {
353             const int64_t seq(curr_seq_);
354             gu::byte_t buf[sizeof(seq)];
355             size_t sz;
356             gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0));
357             return Datagram (gu::Buffer(buf, buf + sz));
358         }
359 
trace() const360         const Trace& trace() const { return tr_; }
361 
set_cvi(const ViewId & vi)362         void set_cvi(const ViewId& vi)
363         {
364             log_debug << uuid() << " setting cvi to " << vi;
365             cvi_ = vi;
366         }
367 
in_cvi() const368         bool in_cvi() const
369         {
370             for (Trace::ViewTraceMap::const_reverse_iterator i(
371                      tr_.view_traces().rbegin());
372                  i != tr_.view_traces().rend(); ++i)
373             {
374                 if (i->first.uuid() == cvi_.uuid() &&
375                     i->first.type() == cvi_.type() &&
376                     i->first.seq()  >= cvi_.seq())
377                 {
378                     return true;
379                 }
380             }
381             return false;
382         }
383 
handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)384         void handle_up(const void* cid, const Datagram& rb,
385                        const ProtoUpMeta& um)
386         {
387             if (rb.len() != 0)
388             {
389                 gcomm_assert((um.source() == UUID::nil()) == false);
390                 // assert(rb.header().size() == 0);
391                 const gu::byte_t* begin(gcomm::begin(rb));
392                 const size_t available(gcomm::available(rb));
393 
394 
395                 // log_debug << um.source() << " " << uuid()
396                 //         << " " << available ;
397                 // log_debug << rb.len() << " " << rb.offset() << " "
398                 //         << rb.header_len();
399                 if (available != 8)
400                 {
401                     log_info << "check_trace fail: " << available;
402                 }
403                 gcomm_assert(available == 8);
404                 int64_t seq;
405                 gu_trace(gu::unserialize8(begin,
406                                           available,
407                                           0,
408                                           seq));
409                 tr_.insert_msg(TraceMsg(um.source(), um.source_view_id(),
410                                         seq));
411             }
412             else
413             {
414                 gcomm_assert(um.has_view() == true);
415                 tr_.insert_view(um.view());
416             }
417         }
418 
419 
handle_timers()420         gu::datetime::Date handle_timers()
421         {
422             std::for_each(protos_.begin(), protos_.end(),
423                           std::mem_fun(&Protolay::handle_timers));
424             return gu::datetime::Date::max();
425         }
426 
427     private:
428         size_t index_;
429         UUID uuid_;
430         std::list<Protolay*> protos_;
431         ViewId cvi_;
432         Trace tr_;
433         int64_t curr_seq_;
434     };
435 
436 
437 
438     class ChannelMsg
439     {
440     public:
ChannelMsg(const Datagram & rb,const UUID & source)441         ChannelMsg(const Datagram& rb, const UUID& source) :
442             rb_(rb),
443             source_(source)
444         {
445         }
rb() const446         const Datagram& rb() const { return rb_; }
source() const447         const UUID& source() const { return source_; }
448     private:
449         Datagram rb_;
450         UUID source_;
451     };
452 
453 
454     class Channel : public Bottomlay
455     {
456     public:
Channel(gu::Config & conf,const size_t ttl=1,const size_t latency=1,const double loss=1.)457         Channel(gu::Config& conf,
458                 const size_t ttl = 1,
459                 const size_t latency = 1,
460                 const double loss = 1.) :
461             Bottomlay(conf),
462             ttl_(ttl),
463             latency_(latency),
464             loss_(loss),
465             queue_()
466         { }
467 
468 
469 
~Channel()470         ~Channel() { }
471 
handle_down(Datagram & wb,const ProtoDownMeta & dm)472         int handle_down(Datagram& wb, const ProtoDownMeta& dm)
473         {
474             gcomm_assert((dm.source() == UUID::nil()) == false);
475             gu_trace(put(wb, dm.source()));
476             return 0;
477         }
478 
479         void put(const Datagram& rb, const UUID& source);
480         ChannelMsg get();
set_ttl(const size_t t)481         void set_ttl(const size_t t) { ttl_ = t; }
ttl() const482         size_t ttl() const { return ttl_; }
set_latency(const size_t l)483         void set_latency(const size_t l)
484         {
485             gcomm_assert(l > 0);
486             latency_ = l;
487         }
latency() const488         size_t latency() const { return latency_; }
set_loss(const double l)489         void set_loss(const double l) { loss_ = l; }
loss() const490         double loss() const { return loss_; }
n_msgs() const491         size_t n_msgs() const
492         {
493             return queue_.size();
494         }
495     private:
496         size_t ttl_;
497         size_t latency_;
498         double loss_;
499         std::deque<std::pair<size_t, ChannelMsg> > queue_;
500     };
501 
502 
503     std::ostream& operator<<(std::ostream& os, const Channel& ch);
504     std::ostream& operator<<(std::ostream& os, const Channel* ch);
505 
506 
507 
508 
509     class MatrixElem
510     {
511     public:
MatrixElem(const size_t ii,const size_t jj)512         MatrixElem(const size_t ii, const size_t jj) : ii_(ii), jj_(jj) { }
ii() const513         size_t ii() const { return ii_; }
jj() const514         size_t jj() const { return jj_; }
operator <(const MatrixElem & cmp) const515         bool operator<(const MatrixElem& cmp) const
516         {
517             return (ii_ < cmp.ii_ || (ii_ == cmp.ii_ && jj_ < cmp.jj_));
518         }
519     private:
520         size_t ii_;
521         size_t jj_;
522     };
523 
524     std::ostream& operator<<(std::ostream& os, const MatrixElem& me);
525 
526     class ChannelMap : public  Map<MatrixElem, Channel*>
527     {
528     public:
529         struct DeleteObject
530         {
operator ()gcomm::ChannelMap::DeleteObject531             void operator()(ChannelMap::value_type& vt)
532             {
533                 delete ChannelMap::value(vt);
534             }
535         };
536     };
537     class NodeMap : public Map<size_t, DummyNode*> {
538     public:
539         struct DeleteObject
540         {
operator ()gcomm::NodeMap::DeleteObject541             void operator()(NodeMap::value_type& vt)
542             {
543                 delete NodeMap::value(vt);
544             }
545         };
546 
547     };
548 
549     class PropagationMatrix
550     {
551     public:
PropagationMatrix()552         PropagationMatrix() : tp_(), prop_()
553         {
554             // Some tests which deal with timer expiration require that
555             // the current time is far enough from zero. Start from
556             // 100 secs after zero, this should give enough headroom
557             // for all tests.
558             gu::datetime::SimClock::init(100*gu::datetime::Sec);
559             // Uncomment this to get logs with simulated timestamps.
560             // The low will be written into stderr.
561             // gu_log_cb = check_trace_log_cb;
562         }
563         ~PropagationMatrix();
564 
565         void insert_tp(DummyNode* t);
566         void set_latency(const size_t ii, const size_t jj, const size_t lat);
567         void set_loss(const size_t ii, const size_t jj, const double loss);
568         void split(const size_t ii, const size_t jj);
569         void merge(const size_t ii, const size_t jj, const double loss = 1.0);
570         void propagate_n(size_t n);
571         void propagate_until_empty();
572         void propagate_until_cvi(bool handle_timers);
573         friend std::ostream& operator<<(std::ostream&,const PropagationMatrix&);
574     private:
575         void expire_timers();
576 
577 
578         size_t count_channel_msgs() const;
579         bool all_in_cvi() const;
580 
581         NodeMap    tp_;
582         ChannelMap prop_;
583     };
584 
585 
586     std::ostream& operator<<(std::ostream& os, const PropagationMatrix& prop);
587 
588     // Cross check traces from vector of dummy nodes
589     void check_trace(const std::vector<DummyNode*>& nvec);
590 
591 } // namespace gcomm
592