1 /* 2 * Copyright (C) 2009-2014 Codership Oy <info@codership.com> 3 * 4 * $Id$ 5 */ 6 7 /*! 8 * Classes for tracing views and messages 9 */ 10 11 #include "gu_uri.hpp" 12 #include "gu_datetime.hpp" 13 14 #include "gcomm/datagram.hpp" 15 #include "gcomm/uuid.hpp" 16 #include "gcomm/protolay.hpp" 17 #include "gcomm/protostack.hpp" 18 #include "gcomm/transport.hpp" 19 #include "gcomm/map.hpp" 20 #include "gcomm/util.hpp" 21 22 #include <vector> 23 #include <deque> 24 #include <functional> 25 26 gu::Config& check_trace_conf(); 27 28 extern "C" void check_trace_log_cb(int, const char*); 29 30 namespace gcomm 31 { 32 class TraceMsg 33 { 34 public: TraceMsg(const UUID & source=UUID::nil (),const ViewId & source_view_id=ViewId (),const int64_t seq=-1)35 TraceMsg(const UUID& source = UUID::nil(), 36 const ViewId& source_view_id = ViewId(), 37 const int64_t seq = -1) : 38 source_(source), 39 source_view_id_(source_view_id), 40 seq_(seq) 41 { } 42 source() const43 const UUID& source() const { return source_; } 44 source_view_id() const45 const ViewId& source_view_id() const { return source_view_id_; } 46 seq() const47 int64_t seq() const { return seq_; } 48 operator ==(const TraceMsg & cmp) const49 bool operator==(const TraceMsg& cmp) const 50 { 51 return (source_ == cmp.source_ && 52 source_view_id_ == cmp.source_view_id_ && 53 seq_ == cmp.seq_ ); 54 55 } 56 57 private: 58 UUID source_; 59 ViewId source_view_id_; 60 int64_t seq_; 61 }; 62 63 std::ostream& operator<<(std::ostream& os, const TraceMsg& msg); 64 65 class ViewTrace 66 { 67 public: ViewTrace(const View & view)68 ViewTrace(const View& view) : view_(view), msgs_() { } 69 insert_msg(const TraceMsg & msg)70 void insert_msg(const TraceMsg& msg) 71 { 72 switch (view_.type()) 73 { 74 case V_REG: 75 gcomm_assert(view_.id() == msg.source_view_id()); 76 gcomm_assert(contains(msg.source()) == true) 77 << "msg source " << msg.source() << " not int view " 78 << view_; 79 break; 80 case V_TRANS: 81 gcomm_assert(view_.id().uuid() == 82 msg.source_view_id().uuid() && 83 view_.id().seq() == 84 msg.source_view_id().seq()); 85 break; 86 case V_NON_PRIM: 87 break; 88 case V_PRIM: 89 gcomm_assert(view_.id() == msg.source_view_id()) 90 << " view id " << view_.id() 91 << " source view " << msg.source_view_id(); 92 gcomm_assert(contains(msg.source()) == true); 93 break; 94 case V_NONE: 95 gu_throw_fatal; 96 break; 97 } 98 99 if (view_.type() != V_NON_PRIM) 100 { 101 msgs_.push_back(msg); 102 } 103 } 104 view() const105 const View& view() const { return view_; } 106 msgs() const107 const std::deque<TraceMsg>& msgs() const { return msgs_; } 108 operator ==(const ViewTrace & cmp) const109 bool operator==(const ViewTrace& cmp) const 110 { 111 // Note: Cannot compare joining members since seen differently 112 // on different merging subsets 113 return (view_.members() == cmp.view_.members() && 114 view_.left() == cmp.view_.left() && 115 view_.partitioned() == cmp.view_.partitioned() && 116 msgs_ == cmp.msgs_ ); 117 } 118 private: 119 contains(const UUID & uuid) const120 bool contains(const UUID& uuid) const 121 { 122 return (view_.members().find(uuid) != view_.members().end() || 123 view_.left().find(uuid) != view_.left().end() || 124 view_.partitioned().find(uuid) !=view_.partitioned().end()); 125 } 126 127 View view_; 128 std::deque<TraceMsg> msgs_; 129 }; 130 131 std::ostream& operator<<(std::ostream& os, const ViewTrace& vtr); 132 133 134 class Trace 135 { 136 public: 137 class ViewTraceMap : public Map<ViewId, ViewTrace> { }; 138 Trace()139 Trace() : views_(), current_view_(views_.end()) { } 140 insert_view(const View & view)141 void insert_view(const View& view) 142 { 143 gu_trace(current_view_ = views_.insert_unique( 144 std::make_pair(view.id(), ViewTrace(view)))); 145 146 log_debug << view; 147 } insert_msg(const TraceMsg & msg)148 void insert_msg(const TraceMsg& msg) 149 { 150 gcomm_assert(current_view_ != views_.end()) 151 << "no view set before msg delivery"; 152 gu_trace(ViewTraceMap::value(current_view_).insert_msg(msg)); 153 } view_traces() const154 const ViewTraceMap& view_traces() const { return views_; } 155 current_view_trace() const156 const ViewTrace& current_view_trace() const 157 { 158 gcomm_assert(current_view_ != views_.end()); 159 return ViewTraceMap::value(current_view_); 160 } 161 162 private: 163 ViewTraceMap views_; 164 ViewTraceMap::iterator current_view_; 165 }; 166 167 168 std::ostream& operator<<(std::ostream& os, const Trace& tr); 169 170 class DummyTransport : public Transport 171 { 172 UUID uuid_; 173 std::deque<Datagram*> out_; 174 bool queue_; 175 static std::unique_ptr<Protonet> net_; 176 Protonet& get_net(); 177 public: 178 DummyTransport(const UUID & uuid=UUID::nil (),bool queue=true,const gu::URI & uri=gu::URI ("dummy:"))179 DummyTransport(const UUID& uuid = UUID::nil(), bool queue = true, 180 const gu::URI& uri = gu::URI("dummy:")) : 181 Transport(get_net(), uri), 182 uuid_(uuid), 183 out_(), 184 queue_(queue) 185 {} 186 ~DummyTransport()187 ~DummyTransport() 188 { 189 out_.clear(); 190 } 191 uuid() const192 const UUID& uuid() const { return uuid_; } 193 mtu() const194 size_t mtu() const { return (1U << 31); } 195 connect(bool first)196 void connect(bool first) { } 197 close(bool force)198 void close(bool force) { } close(const UUID &)199 void close(const UUID&) { } 200 connect()201 void connect() { } 202 listen()203 void listen() 204 { 205 gu_throw_fatal << "not implemented"; 206 } 207 accept()208 Transport *accept() 209 { 210 gu_throw_fatal << "not implemented"; 211 return 0; 212 } 213 handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)214 void handle_up(const void* cid, const Datagram& rb, 215 const ProtoUpMeta& um) 216 { 217 send_up(rb, um); 218 } 219 set_queueing(bool val)220 void set_queueing(bool val) { queue_ = val; } 221 handle_down(Datagram & wb,const ProtoDownMeta & dm)222 int handle_down(Datagram& wb, const ProtoDownMeta& dm) 223 { 224 if (queue_ == true) 225 { 226 // assert(wb.header().size() == 0); 227 out_.push_back(new Datagram(wb)); 228 return 0; 229 } 230 else 231 { 232 gu_trace(return send_down(wb, ProtoDownMeta(0xff, O_UNRELIABLE, uuid_))); 233 } 234 } 235 empty() const236 bool empty() const { return out_.empty(); } 237 out()238 Datagram* out() 239 { 240 if (out_.empty()) 241 { 242 return 0; 243 } 244 Datagram* rb = out_.front(); 245 out_.pop_front(); 246 return rb; 247 } 248 }; 249 250 251 class DummyNode : public Toplay 252 { 253 public: DummyNode(gu::Config & conf,const size_t index,const gcomm::UUID & uuid,const std::list<Protolay * > & protos)254 DummyNode(gu::Config& conf, 255 const size_t index, 256 const gcomm::UUID& uuid, 257 const std::list<Protolay*>& protos) : 258 Toplay (conf), 259 index_ (index), 260 uuid_ (uuid), 261 protos_ (protos), 262 cvi_ (), 263 tr_ (), 264 curr_seq_(0) 265 { 266 gcomm_assert(protos_.empty() == false); 267 std::list<Protolay*>::iterator i, i_next; 268 i = i_next = protos_.begin(); 269 for (++i_next; i_next != protos_.end(); ++i, ++i_next) 270 { 271 gu_trace(gcomm::connect(*i, *i_next)); 272 } 273 gu_trace(gcomm::connect(*i, this)); 274 } 275 ~DummyNode()276 ~DummyNode() 277 { 278 try 279 { 280 std::list<Protolay*>::iterator i, i_next; 281 i = i_next = protos_.begin(); 282 for (++i_next; i_next != protos_.end(); ++i, ++i_next) 283 { 284 gu_trace(gcomm::disconnect(*i, *i_next)); 285 } 286 gu_trace(gcomm::disconnect(*i, this)); 287 std::for_each(protos_.begin(), protos_.end(), gu::DeleteObject()); 288 } 289 catch(std::exception& e) 290 { 291 log_fatal << e.what(); 292 abort(); 293 } 294 } 295 296 uuid() const297 const UUID& uuid() const { return uuid_; } 298 protos()299 std::list<Protolay*>& protos() { return protos_; } 300 index() const301 size_t index() const { return index_; } 302 connect(bool first)303 void connect(bool first) 304 { 305 gu_trace(std::for_each(protos_.rbegin(), protos_.rend(), 306 std::bind2nd( 307 std::mem_fun(&Protolay::connect), first))); 308 } 309 close(bool force=false)310 void close(bool force = false) 311 { 312 for (std::list<Protolay*>::iterator i = protos_.begin(); 313 i != protos_.end(); ++i) 314 { 315 (*i)->close(); 316 } 317 // gu_trace(std::for_each(protos.rbegin(), protos.rend(), 318 // std::mem_fun(&Protolay::close))); 319 } 320 321 close(const UUID & uuid)322 void close(const UUID& uuid) 323 { 324 for (std::list<Protolay*>::iterator i = protos_.begin(); 325 i != protos_.end(); ++i) 326 { 327 (*i)->close(uuid); 328 } 329 // gu_trace(std::for_each(protos.rbegin(), protos.rend(), 330 // std::mem_fun(&Protolay::close))); 331 } 332 send()333 void send() 334 { 335 const int64_t seq(curr_seq_); 336 gu::byte_t buf[sizeof(seq)]; 337 size_t sz; 338 gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0)); 339 Datagram dg(gu::Buffer(buf, buf + sz)); 340 int err = send_down(dg, ProtoDownMeta(0)); 341 if (err != 0) 342 { 343 log_debug << "failed to send: " << strerror(err); 344 } 345 else 346 { 347 ++curr_seq_; 348 } 349 } 350 create_datagram()351 Datagram create_datagram() 352 { 353 const int64_t seq(curr_seq_); 354 gu::byte_t buf[sizeof(seq)]; 355 size_t sz; 356 gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0)); 357 return Datagram (gu::Buffer(buf, buf + sz)); 358 } 359 trace() const360 const Trace& trace() const { return tr_; } 361 set_cvi(const ViewId & vi)362 void set_cvi(const ViewId& vi) 363 { 364 log_debug << uuid() << " setting cvi to " << vi; 365 cvi_ = vi; 366 } 367 in_cvi() const368 bool in_cvi() const 369 { 370 for (Trace::ViewTraceMap::const_reverse_iterator i( 371 tr_.view_traces().rbegin()); 372 i != tr_.view_traces().rend(); ++i) 373 { 374 if (i->first.uuid() == cvi_.uuid() && 375 i->first.type() == cvi_.type() && 376 i->first.seq() >= cvi_.seq()) 377 { 378 return true; 379 } 380 } 381 return false; 382 } 383 handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)384 void handle_up(const void* cid, const Datagram& rb, 385 const ProtoUpMeta& um) 386 { 387 if (rb.len() != 0) 388 { 389 gcomm_assert((um.source() == UUID::nil()) == false); 390 // assert(rb.header().size() == 0); 391 const gu::byte_t* begin(gcomm::begin(rb)); 392 const size_t available(gcomm::available(rb)); 393 394 395 // log_debug << um.source() << " " << uuid() 396 // << " " << available ; 397 // log_debug << rb.len() << " " << rb.offset() << " " 398 // << rb.header_len(); 399 if (available != 8) 400 { 401 log_info << "check_trace fail: " << available; 402 } 403 gcomm_assert(available == 8); 404 int64_t seq; 405 gu_trace(gu::unserialize8(begin, 406 available, 407 0, 408 seq)); 409 tr_.insert_msg(TraceMsg(um.source(), um.source_view_id(), 410 seq)); 411 } 412 else 413 { 414 gcomm_assert(um.has_view() == true); 415 tr_.insert_view(um.view()); 416 } 417 } 418 419 handle_timers()420 gu::datetime::Date handle_timers() 421 { 422 std::for_each(protos_.begin(), protos_.end(), 423 std::mem_fun(&Protolay::handle_timers)); 424 return gu::datetime::Date::max(); 425 } 426 427 private: 428 size_t index_; 429 UUID uuid_; 430 std::list<Protolay*> protos_; 431 ViewId cvi_; 432 Trace tr_; 433 int64_t curr_seq_; 434 }; 435 436 437 438 class ChannelMsg 439 { 440 public: ChannelMsg(const Datagram & rb,const UUID & source)441 ChannelMsg(const Datagram& rb, const UUID& source) : 442 rb_(rb), 443 source_(source) 444 { 445 } rb() const446 const Datagram& rb() const { return rb_; } source() const447 const UUID& source() const { return source_; } 448 private: 449 Datagram rb_; 450 UUID source_; 451 }; 452 453 454 class Channel : public Bottomlay 455 { 456 public: Channel(gu::Config & conf,const size_t ttl=1,const size_t latency=1,const double loss=1.)457 Channel(gu::Config& conf, 458 const size_t ttl = 1, 459 const size_t latency = 1, 460 const double loss = 1.) : 461 Bottomlay(conf), 462 ttl_(ttl), 463 latency_(latency), 464 loss_(loss), 465 queue_() 466 { } 467 468 469 ~Channel()470 ~Channel() { } 471 handle_down(Datagram & wb,const ProtoDownMeta & dm)472 int handle_down(Datagram& wb, const ProtoDownMeta& dm) 473 { 474 gcomm_assert((dm.source() == UUID::nil()) == false); 475 gu_trace(put(wb, dm.source())); 476 return 0; 477 } 478 479 void put(const Datagram& rb, const UUID& source); 480 ChannelMsg get(); set_ttl(const size_t t)481 void set_ttl(const size_t t) { ttl_ = t; } ttl() const482 size_t ttl() const { return ttl_; } set_latency(const size_t l)483 void set_latency(const size_t l) 484 { 485 gcomm_assert(l > 0); 486 latency_ = l; 487 } latency() const488 size_t latency() const { return latency_; } set_loss(const double l)489 void set_loss(const double l) { loss_ = l; } loss() const490 double loss() const { return loss_; } n_msgs() const491 size_t n_msgs() const 492 { 493 return queue_.size(); 494 } 495 private: 496 size_t ttl_; 497 size_t latency_; 498 double loss_; 499 std::deque<std::pair<size_t, ChannelMsg> > queue_; 500 }; 501 502 503 std::ostream& operator<<(std::ostream& os, const Channel& ch); 504 std::ostream& operator<<(std::ostream& os, const Channel* ch); 505 506 507 508 509 class MatrixElem 510 { 511 public: MatrixElem(const size_t ii,const size_t jj)512 MatrixElem(const size_t ii, const size_t jj) : ii_(ii), jj_(jj) { } ii() const513 size_t ii() const { return ii_; } jj() const514 size_t jj() const { return jj_; } operator <(const MatrixElem & cmp) const515 bool operator<(const MatrixElem& cmp) const 516 { 517 return (ii_ < cmp.ii_ || (ii_ == cmp.ii_ && jj_ < cmp.jj_)); 518 } 519 private: 520 size_t ii_; 521 size_t jj_; 522 }; 523 524 std::ostream& operator<<(std::ostream& os, const MatrixElem& me); 525 526 class ChannelMap : public Map<MatrixElem, Channel*> 527 { 528 public: 529 struct DeleteObject 530 { operator ()gcomm::ChannelMap::DeleteObject531 void operator()(ChannelMap::value_type& vt) 532 { 533 delete ChannelMap::value(vt); 534 } 535 }; 536 }; 537 class NodeMap : public Map<size_t, DummyNode*> { 538 public: 539 struct DeleteObject 540 { operator ()gcomm::NodeMap::DeleteObject541 void operator()(NodeMap::value_type& vt) 542 { 543 delete NodeMap::value(vt); 544 } 545 }; 546 547 }; 548 549 class PropagationMatrix 550 { 551 public: PropagationMatrix()552 PropagationMatrix() : tp_(), prop_() 553 { 554 // Some tests which deal with timer expiration require that 555 // the current time is far enough from zero. Start from 556 // 100 secs after zero, this should give enough headroom 557 // for all tests. 558 gu::datetime::SimClock::init(100*gu::datetime::Sec); 559 // Uncomment this to get logs with simulated timestamps. 560 // The low will be written into stderr. 561 // gu_log_cb = check_trace_log_cb; 562 } 563 ~PropagationMatrix(); 564 565 void insert_tp(DummyNode* t); 566 void set_latency(const size_t ii, const size_t jj, const size_t lat); 567 void set_loss(const size_t ii, const size_t jj, const double loss); 568 void split(const size_t ii, const size_t jj); 569 void merge(const size_t ii, const size_t jj, const double loss = 1.0); 570 void propagate_n(size_t n); 571 void propagate_until_empty(); 572 void propagate_until_cvi(bool handle_timers); 573 friend std::ostream& operator<<(std::ostream&,const PropagationMatrix&); 574 private: 575 void expire_timers(); 576 577 578 size_t count_channel_msgs() const; 579 bool all_in_cvi() const; 580 581 NodeMap tp_; 582 ChannelMap prop_; 583 }; 584 585 586 std::ostream& operator<<(std::ostream& os, const PropagationMatrix& prop); 587 588 // Cross check traces from vector of dummy nodes 589 void check_trace(const std::vector<DummyNode*>& nvec); 590 591 } // namespace gcomm 592