1package pubsub 2 3import ( 4 "time" 5 6 "github.com/libp2p/go-libp2p-core/peer" 7 "github.com/libp2p/go-libp2p-core/protocol" 8 9 pb "github.com/libp2p/go-libp2p-pubsub/pb" 10) 11 12// Generic event tracer interface 13type EventTracer interface { 14 Trace(evt *pb.TraceEvent) 15} 16 17// internal interface for score tracing 18type internalTracer interface { 19 AddPeer(p peer.ID, proto protocol.ID) 20 RemovePeer(p peer.ID) 21 Join(topic string) 22 Leave(topic string) 23 Graft(p peer.ID, topic string) 24 Prune(p peer.ID, topic string) 25 ValidateMessage(msg *Message) 26 DeliverMessage(msg *Message) 27 RejectMessage(msg *Message, reason string) 28 DuplicateMessage(msg *Message) 29 ThrottlePeer(p peer.ID) 30} 31 32// pubsub tracer details 33type pubsubTracer struct { 34 tracer EventTracer 35 internal []internalTracer 36 pid peer.ID 37 msgID MsgIdFunction 38} 39 40func (t *pubsubTracer) PublishMessage(msg *Message) { 41 if t == nil { 42 return 43 } 44 45 if t.tracer == nil { 46 return 47 } 48 49 now := time.Now().UnixNano() 50 evt := &pb.TraceEvent{ 51 Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(), 52 PeerID: []byte(t.pid), 53 Timestamp: &now, 54 PublishMessage: &pb.TraceEvent_PublishMessage{ 55 MessageID: []byte(t.msgID(msg.Message)), 56 Topic: msg.Message.Topic, 57 }, 58 } 59 60 t.tracer.Trace(evt) 61} 62 63func (t *pubsubTracer) ValidateMessage(msg *Message) { 64 if t == nil { 65 return 66 } 67 68 if msg.ReceivedFrom != t.pid { 69 for _, tr := range t.internal { 70 tr.ValidateMessage(msg) 71 } 72 } 73} 74 75func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { 76 if t == nil { 77 return 78 } 79 80 if msg.ReceivedFrom != t.pid { 81 for _, tr := range t.internal { 82 tr.RejectMessage(msg, reason) 83 } 84 } 85 86 if t.tracer == nil { 87 return 88 } 89 90 now := time.Now().UnixNano() 91 evt := &pb.TraceEvent{ 92 Type: pb.TraceEvent_REJECT_MESSAGE.Enum(), 93 PeerID: []byte(t.pid), 94 Timestamp: &now, 95 RejectMessage: &pb.TraceEvent_RejectMessage{ 96 MessageID: []byte(t.msgID(msg.Message)), 97 ReceivedFrom: []byte(msg.ReceivedFrom), 98 Reason: &reason, 99 Topic: msg.Topic, 100 }, 101 } 102 103 t.tracer.Trace(evt) 104} 105 106func (t *pubsubTracer) DuplicateMessage(msg *Message) { 107 if t == nil { 108 return 109 } 110 111 if msg.ReceivedFrom != t.pid { 112 for _, tr := range t.internal { 113 tr.DuplicateMessage(msg) 114 } 115 } 116 117 if t.tracer == nil { 118 return 119 } 120 121 now := time.Now().UnixNano() 122 evt := &pb.TraceEvent{ 123 Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(), 124 PeerID: []byte(t.pid), 125 Timestamp: &now, 126 DuplicateMessage: &pb.TraceEvent_DuplicateMessage{ 127 MessageID: []byte(t.msgID(msg.Message)), 128 ReceivedFrom: []byte(msg.ReceivedFrom), 129 Topic: msg.Topic, 130 }, 131 } 132 133 t.tracer.Trace(evt) 134} 135 136func (t *pubsubTracer) DeliverMessage(msg *Message) { 137 if t == nil { 138 return 139 } 140 141 if msg.ReceivedFrom != t.pid { 142 for _, tr := range t.internal { 143 tr.DeliverMessage(msg) 144 } 145 } 146 147 if t.tracer == nil { 148 return 149 } 150 151 now := time.Now().UnixNano() 152 evt := &pb.TraceEvent{ 153 Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(), 154 PeerID: []byte(t.pid), 155 Timestamp: &now, 156 DeliverMessage: &pb.TraceEvent_DeliverMessage{ 157 MessageID: []byte(t.msgID(msg.Message)), 158 Topic: msg.Topic, 159 ReceivedFrom: []byte(msg.ReceivedFrom), 160 }, 161 } 162 163 t.tracer.Trace(evt) 164} 165 166func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) { 167 if t == nil { 168 return 169 } 170 171 for _, tr := range t.internal { 172 tr.AddPeer(p, proto) 173 } 174 175 if t.tracer == nil { 176 return 177 } 178 179 protoStr := string(proto) 180 now := time.Now().UnixNano() 181 evt := &pb.TraceEvent{ 182 Type: pb.TraceEvent_ADD_PEER.Enum(), 183 PeerID: []byte(t.pid), 184 Timestamp: &now, 185 AddPeer: &pb.TraceEvent_AddPeer{ 186 PeerID: []byte(p), 187 Proto: &protoStr, 188 }, 189 } 190 191 t.tracer.Trace(evt) 192} 193 194func (t *pubsubTracer) RemovePeer(p peer.ID) { 195 if t == nil { 196 return 197 } 198 199 for _, tr := range t.internal { 200 tr.RemovePeer(p) 201 } 202 203 if t.tracer == nil { 204 return 205 } 206 207 now := time.Now().UnixNano() 208 evt := &pb.TraceEvent{ 209 Type: pb.TraceEvent_REMOVE_PEER.Enum(), 210 PeerID: []byte(t.pid), 211 Timestamp: &now, 212 RemovePeer: &pb.TraceEvent_RemovePeer{ 213 PeerID: []byte(p), 214 }, 215 } 216 217 t.tracer.Trace(evt) 218} 219 220func (t *pubsubTracer) RecvRPC(rpc *RPC) { 221 if t == nil { 222 return 223 } 224 225 if t.tracer == nil { 226 return 227 } 228 229 now := time.Now().UnixNano() 230 evt := &pb.TraceEvent{ 231 Type: pb.TraceEvent_RECV_RPC.Enum(), 232 PeerID: []byte(t.pid), 233 Timestamp: &now, 234 RecvRPC: &pb.TraceEvent_RecvRPC{ 235 ReceivedFrom: []byte(rpc.from), 236 Meta: t.traceRPCMeta(rpc), 237 }, 238 } 239 240 t.tracer.Trace(evt) 241} 242 243func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) { 244 if t == nil { 245 return 246 } 247 248 if t.tracer == nil { 249 return 250 } 251 252 now := time.Now().UnixNano() 253 evt := &pb.TraceEvent{ 254 Type: pb.TraceEvent_SEND_RPC.Enum(), 255 PeerID: []byte(t.pid), 256 Timestamp: &now, 257 SendRPC: &pb.TraceEvent_SendRPC{ 258 SendTo: []byte(p), 259 Meta: t.traceRPCMeta(rpc), 260 }, 261 } 262 263 t.tracer.Trace(evt) 264} 265 266func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) { 267 if t == nil { 268 return 269 } 270 271 if t.tracer == nil { 272 return 273 } 274 275 now := time.Now().UnixNano() 276 evt := &pb.TraceEvent{ 277 Type: pb.TraceEvent_DROP_RPC.Enum(), 278 PeerID: []byte(t.pid), 279 Timestamp: &now, 280 DropRPC: &pb.TraceEvent_DropRPC{ 281 SendTo: []byte(p), 282 Meta: t.traceRPCMeta(rpc), 283 }, 284 } 285 286 t.tracer.Trace(evt) 287} 288 289func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { 290 rpcMeta := new(pb.TraceEvent_RPCMeta) 291 292 var msgs []*pb.TraceEvent_MessageMeta 293 for _, m := range rpc.Publish { 294 msgs = append(msgs, &pb.TraceEvent_MessageMeta{ 295 MessageID: []byte(t.msgID(m)), 296 Topic: m.Topic, 297 }) 298 } 299 rpcMeta.Messages = msgs 300 301 var subs []*pb.TraceEvent_SubMeta 302 for _, sub := range rpc.Subscriptions { 303 subs = append(subs, &pb.TraceEvent_SubMeta{ 304 Subscribe: sub.Subscribe, 305 Topic: sub.Topicid, 306 }) 307 } 308 rpcMeta.Subscription = subs 309 310 if rpc.Control != nil { 311 var ihave []*pb.TraceEvent_ControlIHaveMeta 312 for _, ctl := range rpc.Control.Ihave { 313 var mids [][]byte 314 for _, mid := range ctl.MessageIDs { 315 mids = append(mids, []byte(mid)) 316 } 317 ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{ 318 Topic: ctl.TopicID, 319 MessageIDs: mids, 320 }) 321 } 322 323 var iwant []*pb.TraceEvent_ControlIWantMeta 324 for _, ctl := range rpc.Control.Iwant { 325 var mids [][]byte 326 for _, mid := range ctl.MessageIDs { 327 mids = append(mids, []byte(mid)) 328 } 329 iwant = append(iwant, &pb.TraceEvent_ControlIWantMeta{ 330 MessageIDs: mids, 331 }) 332 } 333 334 var graft []*pb.TraceEvent_ControlGraftMeta 335 for _, ctl := range rpc.Control.Graft { 336 graft = append(graft, &pb.TraceEvent_ControlGraftMeta{ 337 Topic: ctl.TopicID, 338 }) 339 } 340 341 var prune []*pb.TraceEvent_ControlPruneMeta 342 for _, ctl := range rpc.Control.Prune { 343 peers := make([][]byte, 0, len(ctl.Peers)) 344 for _, pi := range ctl.Peers { 345 peers = append(peers, pi.PeerID) 346 } 347 prune = append(prune, &pb.TraceEvent_ControlPruneMeta{ 348 Topic: ctl.TopicID, 349 Peers: peers, 350 }) 351 } 352 353 rpcMeta.Control = &pb.TraceEvent_ControlMeta{ 354 Ihave: ihave, 355 Iwant: iwant, 356 Graft: graft, 357 Prune: prune, 358 } 359 } 360 361 return rpcMeta 362} 363 364func (t *pubsubTracer) Join(topic string) { 365 if t == nil { 366 return 367 } 368 369 for _, tr := range t.internal { 370 tr.Join(topic) 371 } 372 373 if t.tracer == nil { 374 return 375 } 376 377 now := time.Now().UnixNano() 378 evt := &pb.TraceEvent{ 379 Type: pb.TraceEvent_JOIN.Enum(), 380 PeerID: []byte(t.pid), 381 Timestamp: &now, 382 Join: &pb.TraceEvent_Join{ 383 Topic: &topic, 384 }, 385 } 386 387 t.tracer.Trace(evt) 388} 389 390func (t *pubsubTracer) Leave(topic string) { 391 if t == nil { 392 return 393 } 394 395 for _, tr := range t.internal { 396 tr.Leave(topic) 397 } 398 399 if t.tracer == nil { 400 return 401 } 402 403 now := time.Now().UnixNano() 404 evt := &pb.TraceEvent{ 405 Type: pb.TraceEvent_LEAVE.Enum(), 406 PeerID: []byte(t.pid), 407 Timestamp: &now, 408 Leave: &pb.TraceEvent_Leave{ 409 Topic: &topic, 410 }, 411 } 412 413 t.tracer.Trace(evt) 414} 415 416func (t *pubsubTracer) Graft(p peer.ID, topic string) { 417 if t == nil { 418 return 419 } 420 421 for _, tr := range t.internal { 422 tr.Graft(p, topic) 423 } 424 425 if t.tracer == nil { 426 return 427 } 428 429 now := time.Now().UnixNano() 430 evt := &pb.TraceEvent{ 431 Type: pb.TraceEvent_GRAFT.Enum(), 432 PeerID: []byte(t.pid), 433 Timestamp: &now, 434 Graft: &pb.TraceEvent_Graft{ 435 PeerID: []byte(p), 436 Topic: &topic, 437 }, 438 } 439 440 t.tracer.Trace(evt) 441} 442 443func (t *pubsubTracer) Prune(p peer.ID, topic string) { 444 if t == nil { 445 return 446 } 447 448 for _, tr := range t.internal { 449 tr.Prune(p, topic) 450 } 451 452 if t.tracer == nil { 453 return 454 } 455 456 now := time.Now().UnixNano() 457 evt := &pb.TraceEvent{ 458 Type: pb.TraceEvent_PRUNE.Enum(), 459 PeerID: []byte(t.pid), 460 Timestamp: &now, 461 Prune: &pb.TraceEvent_Prune{ 462 PeerID: []byte(p), 463 Topic: &topic, 464 }, 465 } 466 467 t.tracer.Trace(evt) 468} 469 470func (t *pubsubTracer) ThrottlePeer(p peer.ID) { 471 if t == nil { 472 return 473 } 474 475 for _, tr := range t.internal { 476 tr.ThrottlePeer(p) 477 } 478} 479