1 /* 2 * Copyright (C) 2009-2014 Codership Oy <info@codership.com> 3 * 4 * $Id$ 5 */ 6 7 /*! 8 * Classes for tracing views and messages 9 */ 10 11 #include "gu_uri.hpp" 12 #include "gu_datetime.hpp" 13 14 #include "gcomm/datagram.hpp" 15 #include "gcomm/uuid.hpp" 16 #include "gcomm/protolay.hpp" 17 #include "gcomm/protostack.hpp" 18 #include "gcomm/transport.hpp" 19 #include "gcomm/map.hpp" 20 #include "gcomm/util.hpp" 21 22 #include <vector> 23 #include <deque> 24 #include <functional> 25 26 gu::Config& check_trace_conf(); 27 28 extern "C" void check_trace_log_cb(int, const char*); 29 30 namespace gcomm 31 { 32 class TraceMsg 33 { 34 public: TraceMsg(const UUID & source=UUID::nil (),const ViewId & source_view_id=ViewId (),const int64_t seq=-1)35 TraceMsg(const UUID& source = UUID::nil(), 36 const ViewId& source_view_id = ViewId(), 37 const int64_t seq = -1) : 38 source_(source), 39 source_view_id_(source_view_id), 40 seq_(seq) 41 { } 42 source() const43 const UUID& source() const { return source_; } 44 source_view_id() const45 const ViewId& source_view_id() const { return source_view_id_; } 46 seq() const47 int64_t seq() const { return seq_; } 48 operator ==(const TraceMsg & cmp) const49 bool operator==(const TraceMsg& cmp) const 50 { 51 return (source_ == cmp.source_ && 52 source_view_id_ == cmp.source_view_id_ && 53 seq_ == cmp.seq_ ); 54 55 } 56 57 private: 58 UUID source_; 59 ViewId source_view_id_; 60 int64_t seq_; 61 }; 62 63 std::ostream& operator<<(std::ostream& os, const TraceMsg& msg); 64 65 class ViewTrace 66 { 67 public: ViewTrace(const View & view)68 ViewTrace(const View& view) : view_(view), msgs_() { } 69 insert_msg(const TraceMsg & msg)70 void insert_msg(const TraceMsg& msg) 71 { 72 switch (view_.type()) 73 { 74 case V_REG: 75 gcomm_assert(view_.id() == msg.source_view_id()); 76 gcomm_assert(contains(msg.source()) == true) 77 << "msg source " << msg.source() << " not int view " 78 << view_; 79 break; 80 case V_TRANS: 81 gcomm_assert(view_.id().uuid() == 82 msg.source_view_id().uuid() && 83 view_.id().seq() == 84 msg.source_view_id().seq()); 85 break; 86 case V_NON_PRIM: 87 break; 88 case V_PRIM: 89 gcomm_assert(view_.id() == msg.source_view_id()) 90 << " view id " << view_.id() 91 << " source view " << msg.source_view_id(); 92 gcomm_assert(contains(msg.source()) == true); 93 break; 94 case V_NONE: 95 gu_throw_fatal; 96 break; 97 } 98 99 if (view_.type() != V_NON_PRIM) 100 { 101 msgs_.push_back(msg); 102 } 103 } 104 view() const105 const View& view() const { return view_; } 106 msgs() const107 const std::deque<TraceMsg>& msgs() const { return msgs_; } 108 operator ==(const ViewTrace & cmp) const109 bool operator==(const ViewTrace& cmp) const 110 { 111 // Note: Cannot compare joining members since seen differently 112 // on different merging subsets 113 return (view_.members() == cmp.view_.members() && 114 view_.left() == cmp.view_.left() && 115 view_.partitioned() == cmp.view_.partitioned() && 116 msgs_ == cmp.msgs_ ); 117 } 118 private: 119 contains(const UUID & uuid) const120 bool contains(const UUID& uuid) const 121 { 122 return (view_.members().find(uuid) != view_.members().end() || 123 view_.left().find(uuid) != view_.left().end() || 124 view_.partitioned().find(uuid) !=view_.partitioned().end()); 125 } 126 127 View view_; 128 std::deque<TraceMsg> msgs_; 129 }; 130 131 std::ostream& operator<<(std::ostream& os, const ViewTrace& vtr); 132 133 134 class Trace 135 { 136 public: 137 class ViewTraceMap : public Map<ViewId, ViewTrace> { }; 138 Trace()139 Trace() : views_(), current_view_(views_.end()) { } 140 insert_view(const View & view)141 void insert_view(const View& view) 142 { 143 gu_trace(current_view_ = views_.insert_unique( 144 std::make_pair(view.id(), ViewTrace(view)))); 145 146 log_debug << view; 147 } insert_msg(const TraceMsg & msg)148 void insert_msg(const TraceMsg& msg) 149 { 150 gcomm_assert(current_view_ != views_.end()) 151 << "no view set before msg delivery"; 152 gu_trace(ViewTraceMap::value(current_view_).insert_msg(msg)); 153 } view_traces() const154 const ViewTraceMap& view_traces() const { return views_; } 155 current_view_trace() const156 const ViewTrace& current_view_trace() const 157 { 158 gcomm_assert(current_view_ != views_.end()); 159 return ViewTraceMap::value(current_view_); 160 } 161 162 private: 163 ViewTraceMap views_; 164 ViewTraceMap::iterator current_view_; 165 }; 166 167 168 std::ostream& operator<<(std::ostream& os, const Trace& tr); 169 170 class DummyTransport : public Transport 171 { 172 UUID uuid_; 173 std::deque<Datagram*> out_; 174 bool queue_; 175 176 public: 177 DummyTransport(const UUID & uuid=UUID::nil (),bool queue=true,const gu::URI & uri=gu::URI ("dummy:"))178 DummyTransport(const UUID& uuid = UUID::nil(), bool queue = true, 179 const gu::URI& uri = gu::URI("dummy:")) : 180 Transport(*std::auto_ptr<Protonet> 181 (Protonet::create(check_trace_conf())), uri), 182 uuid_(uuid), 183 out_(), 184 queue_(queue) 185 {} 186 ~DummyTransport()187 ~DummyTransport() 188 { 189 out_.clear(); 190 } 191 uuid() const192 const UUID& uuid() const { return uuid_; } 193 mtu() const194 size_t mtu() const { return (1U << 31); } 195 connect(bool first)196 void connect(bool first) { } 197 close(bool force)198 void close(bool force) { } close(const UUID &)199 void close(const UUID&) { } 200 connect()201 void connect() { } 202 listen()203 void listen() 204 { 205 gu_throw_fatal << "not implemented"; 206 } 207 accept()208 Transport *accept() 209 { 210 gu_throw_fatal << "not implemented"; 211 return 0; 212 } 213 handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)214 void handle_up(const void* cid, const Datagram& rb, 215 const ProtoUpMeta& um) 216 { 217 send_up(rb, um); 218 } 219 set_queueing(bool val)220 void set_queueing(bool val) { queue_ = val; } 221 handle_down(Datagram & wb,const ProtoDownMeta & dm)222 int handle_down(Datagram& wb, const ProtoDownMeta& dm) 223 { 224 if (queue_ == true) 225 { 226 // assert(wb.header().size() == 0); 227 out_.push_back(new Datagram(wb)); 228 return 0; 229 } 230 else 231 { 232 gu_trace(return send_down(wb, ProtoDownMeta(0xff, O_UNRELIABLE, uuid_))); 233 } 234 } 235 empty() const236 bool empty() const { return out_.empty(); } 237 out()238 Datagram* out() 239 { 240 if (out_.empty()) 241 { 242 return 0; 243 } 244 Datagram* rb = out_.front(); 245 out_.pop_front(); 246 return rb; 247 } 248 }; 249 250 251 class DummyNode : public Toplay 252 { 253 public: DummyNode(gu::Config & conf,const size_t index,const std::list<Protolay * > & protos)254 DummyNode(gu::Config& conf, 255 const size_t index, 256 const std::list<Protolay*>& protos) : 257 Toplay (conf), 258 index_ (index), 259 uuid_ (UUID(static_cast<int32_t>(index))), 260 protos_ (protos), 261 cvi_ (), 262 tr_ (), 263 curr_seq_(0) 264 { 265 gcomm_assert(protos_.empty() == false); 266 std::list<Protolay*>::iterator i, i_next; 267 i = i_next = protos_.begin(); 268 for (++i_next; i_next != protos_.end(); ++i, ++i_next) 269 { 270 gu_trace(gcomm::connect(*i, *i_next)); 271 } 272 gu_trace(gcomm::connect(*i, this)); 273 } 274 ~DummyNode()275 ~DummyNode() 276 { 277 try 278 { 279 std::list<Protolay*>::iterator i, i_next; 280 i = i_next = protos_.begin(); 281 for (++i_next; i_next != protos_.end(); ++i, ++i_next) 282 { 283 gu_trace(gcomm::disconnect(*i, *i_next)); 284 } 285 gu_trace(gcomm::disconnect(*i, this)); 286 std::for_each(protos_.begin(), protos_.end(), gu::DeleteObject()); 287 } 288 catch(std::exception& e) 289 { 290 log_fatal << e.what(); 291 abort(); 292 } 293 } 294 295 uuid() const296 const UUID& uuid() const { return uuid_; } 297 protos()298 std::list<Protolay*>& protos() { return protos_; } 299 index() const300 size_t index() const { return index_; } 301 connect(bool first)302 void connect(bool first) 303 { 304 gu_trace(std::for_each(protos_.rbegin(), protos_.rend(), 305 std::bind2nd( 306 std::mem_fun(&Protolay::connect), first))); 307 } 308 close(bool force=false)309 void close(bool force = false) 310 { 311 for (std::list<Protolay*>::iterator i = protos_.begin(); 312 i != protos_.end(); ++i) 313 { 314 (*i)->close(); 315 } 316 // gu_trace(std::for_each(protos.rbegin(), protos.rend(), 317 // std::mem_fun(&Protolay::close))); 318 } 319 320 close(const UUID & uuid)321 void close(const UUID& uuid) 322 { 323 for (std::list<Protolay*>::iterator i = protos_.begin(); 324 i != protos_.end(); ++i) 325 { 326 (*i)->close(uuid); 327 } 328 // gu_trace(std::for_each(protos.rbegin(), protos.rend(), 329 // std::mem_fun(&Protolay::close))); 330 } 331 send()332 void send() 333 { 334 const int64_t seq(curr_seq_); 335 gu::byte_t buf[sizeof(seq)]; 336 size_t sz; 337 gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0)); 338 Datagram dg(gu::Buffer(buf, buf + sz)); 339 int err = send_down(dg, ProtoDownMeta(0)); 340 if (err != 0) 341 { 342 log_debug << "failed to send: " << strerror(err); 343 } 344 else 345 { 346 ++curr_seq_; 347 } 348 } 349 create_datagram()350 Datagram create_datagram() 351 { 352 const int64_t seq(curr_seq_); 353 gu::byte_t buf[sizeof(seq)]; 354 size_t sz; 355 gu_trace(sz = gu::serialize8(seq, buf, sizeof(buf), 0)); 356 return Datagram (gu::Buffer(buf, buf + sz)); 357 } 358 trace() const359 const Trace& trace() const { return tr_; } 360 set_cvi(const ViewId & vi)361 void set_cvi(const ViewId& vi) 362 { 363 log_debug << uuid() << " setting cvi to " << vi; 364 cvi_ = vi; 365 } 366 in_cvi() const367 bool in_cvi() const 368 { 369 for (Trace::ViewTraceMap::const_reverse_iterator i( 370 tr_.view_traces().rbegin()); 371 i != tr_.view_traces().rend(); ++i) 372 { 373 if (i->first.uuid() == cvi_.uuid() && 374 i->first.type() == cvi_.type() && 375 i->first.seq() >= cvi_.seq()) 376 { 377 return true; 378 } 379 } 380 return false; 381 } 382 handle_up(const void * cid,const Datagram & rb,const ProtoUpMeta & um)383 void handle_up(const void* cid, const Datagram& rb, 384 const ProtoUpMeta& um) 385 { 386 if (rb.len() != 0) 387 { 388 gcomm_assert((um.source() == UUID::nil()) == false); 389 // assert(rb.header().size() == 0); 390 const gu::byte_t* begin(gcomm::begin(rb)); 391 const size_t available(gcomm::available(rb)); 392 393 394 // log_debug << um.source() << " " << uuid() 395 // << " " << available ; 396 // log_debug << rb.len() << " " << rb.offset() << " " 397 // << rb.header_len(); 398 if (available != 8) 399 { 400 log_info << "check_trace fail: " << available; 401 } 402 gcomm_assert(available == 8); 403 int64_t seq; 404 gu_trace(gu::unserialize8(begin, 405 available, 406 0, 407 seq)); 408 tr_.insert_msg(TraceMsg(um.source(), um.source_view_id(), 409 seq)); 410 } 411 else 412 { 413 gcomm_assert(um.has_view() == true); 414 tr_.insert_view(um.view()); 415 } 416 } 417 418 handle_timers()419 gu::datetime::Date handle_timers() 420 { 421 std::for_each(protos_.begin(), protos_.end(), 422 std::mem_fun(&Protolay::handle_timers)); 423 return gu::datetime::Date::max(); 424 } 425 426 private: 427 size_t index_; 428 UUID uuid_; 429 std::list<Protolay*> protos_; 430 ViewId cvi_; 431 Trace tr_; 432 int64_t curr_seq_; 433 }; 434 435 436 437 class ChannelMsg 438 { 439 public: ChannelMsg(const Datagram & rb,const UUID & source)440 ChannelMsg(const Datagram& rb, const UUID& source) : 441 rb_(rb), 442 source_(source) 443 { 444 } rb() const445 const Datagram& rb() const { return rb_; } source() const446 const UUID& source() const { return source_; } 447 private: 448 Datagram rb_; 449 UUID source_; 450 }; 451 452 453 class Channel : public Bottomlay 454 { 455 public: Channel(gu::Config & conf,const size_t ttl=1,const size_t latency=1,const double loss=1.)456 Channel(gu::Config& conf, 457 const size_t ttl = 1, 458 const size_t latency = 1, 459 const double loss = 1.) : 460 Bottomlay(conf), 461 ttl_(ttl), 462 latency_(latency), 463 loss_(loss), 464 queue_() 465 { } 466 467 468 ~Channel()469 ~Channel() { } 470 handle_down(Datagram & wb,const ProtoDownMeta & dm)471 int handle_down(Datagram& wb, const ProtoDownMeta& dm) 472 { 473 gcomm_assert((dm.source() == UUID::nil()) == false); 474 gu_trace(put(wb, dm.source())); 475 return 0; 476 } 477 478 void put(const Datagram& rb, const UUID& source); 479 ChannelMsg get(); set_ttl(const size_t t)480 void set_ttl(const size_t t) { ttl_ = t; } ttl() const481 size_t ttl() const { return ttl_; } set_latency(const size_t l)482 void set_latency(const size_t l) 483 { 484 gcomm_assert(l > 0); 485 latency_ = l; 486 } latency() const487 size_t latency() const { return latency_; } set_loss(const double l)488 void set_loss(const double l) { loss_ = l; } loss() const489 double loss() const { return loss_; } n_msgs() const490 size_t n_msgs() const 491 { 492 return queue_.size(); 493 } 494 private: 495 size_t ttl_; 496 size_t latency_; 497 double loss_; 498 std::deque<std::pair<size_t, ChannelMsg> > queue_; 499 }; 500 501 502 std::ostream& operator<<(std::ostream& os, const Channel& ch); 503 std::ostream& operator<<(std::ostream& os, const Channel* ch); 504 505 506 507 508 class MatrixElem 509 { 510 public: MatrixElem(const size_t ii,const size_t jj)511 MatrixElem(const size_t ii, const size_t jj) : ii_(ii), jj_(jj) { } ii() const512 size_t ii() const { return ii_; } jj() const513 size_t jj() const { return jj_; } operator <(const MatrixElem & cmp) const514 bool operator<(const MatrixElem& cmp) const 515 { 516 return (ii_ < cmp.ii_ || (ii_ == cmp.ii_ && jj_ < cmp.jj_)); 517 } 518 private: 519 size_t ii_; 520 size_t jj_; 521 }; 522 523 std::ostream& operator<<(std::ostream& os, const MatrixElem& me); 524 525 class ChannelMap : public Map<MatrixElem, Channel*> 526 { 527 public: 528 struct DeleteObject 529 { operator ()gcomm::ChannelMap::DeleteObject530 void operator()(ChannelMap::value_type& vt) 531 { 532 delete ChannelMap::value(vt); 533 } 534 }; 535 }; 536 class NodeMap : public Map<size_t, DummyNode*> { 537 public: 538 struct DeleteObject 539 { operator ()gcomm::NodeMap::DeleteObject540 void operator()(NodeMap::value_type& vt) 541 { 542 delete NodeMap::value(vt); 543 } 544 }; 545 546 }; 547 548 class PropagationMatrix 549 { 550 public: PropagationMatrix()551 PropagationMatrix() : tp_(), prop_() 552 { 553 // Some tests which deal with timer expiration require that 554 // the current time is far enough from zero. Start from 555 // 100 secs after zero, this should give enough headroom 556 // for all tests. 557 gu::datetime::SimClock::init(100*gu::datetime::Sec); 558 // Uncomment this to get logs with simulated timestamps. 559 // The low will be written into stderr. 560 // gu_log_cb = check_trace_log_cb; 561 } 562 ~PropagationMatrix(); 563 564 void insert_tp(DummyNode* t); 565 void set_latency(const size_t ii, const size_t jj, const size_t lat); 566 void set_loss(const size_t ii, const size_t jj, const double loss); 567 void split(const size_t ii, const size_t jj); 568 void merge(const size_t ii, const size_t jj, const double loss = 1.0); 569 void propagate_n(size_t n); 570 void propagate_until_empty(); 571 void propagate_until_cvi(bool handle_timers); 572 friend std::ostream& operator<<(std::ostream&,const PropagationMatrix&); 573 private: 574 void expire_timers(); 575 576 577 size_t count_channel_msgs() const; 578 bool all_in_cvi() const; 579 580 NodeMap tp_; 581 ChannelMap prop_; 582 }; 583 584 585 std::ostream& operator<<(std::ostream& os, const PropagationMatrix& prop); 586 587 // Cross check traces from vector of dummy nodes 588 void check_trace(const std::vector<DummyNode*>& nvec); 589 590 } // namespace gcomm 591