1package pubsub
2
3import (
4	"time"
5
6	"github.com/libp2p/go-libp2p-core/peer"
7	"github.com/libp2p/go-libp2p-core/protocol"
8
9	pb "github.com/libp2p/go-libp2p-pubsub/pb"
10)
11
12// Generic event tracer interface
13type EventTracer interface {
14	Trace(evt *pb.TraceEvent)
15}
16
17// internal interface for score tracing
18type internalTracer interface {
19	AddPeer(p peer.ID, proto protocol.ID)
20	RemovePeer(p peer.ID)
21	Join(topic string)
22	Leave(topic string)
23	Graft(p peer.ID, topic string)
24	Prune(p peer.ID, topic string)
25	ValidateMessage(msg *Message)
26	DeliverMessage(msg *Message)
27	RejectMessage(msg *Message, reason string)
28	DuplicateMessage(msg *Message)
29	ThrottlePeer(p peer.ID)
30}
31
32// pubsub tracer details
33type pubsubTracer struct {
34	tracer   EventTracer
35	internal []internalTracer
36	pid      peer.ID
37	msgID    MsgIdFunction
38}
39
40func (t *pubsubTracer) PublishMessage(msg *Message) {
41	if t == nil {
42		return
43	}
44
45	if t.tracer == nil {
46		return
47	}
48
49	now := time.Now().UnixNano()
50	evt := &pb.TraceEvent{
51		Type:      pb.TraceEvent_PUBLISH_MESSAGE.Enum(),
52		PeerID:    []byte(t.pid),
53		Timestamp: &now,
54		PublishMessage: &pb.TraceEvent_PublishMessage{
55			MessageID: []byte(t.msgID(msg.Message)),
56			Topic:     msg.Message.Topic,
57		},
58	}
59
60	t.tracer.Trace(evt)
61}
62
63func (t *pubsubTracer) ValidateMessage(msg *Message) {
64	if t == nil {
65		return
66	}
67
68	if msg.ReceivedFrom != t.pid {
69		for _, tr := range t.internal {
70			tr.ValidateMessage(msg)
71		}
72	}
73}
74
75func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
76	if t == nil {
77		return
78	}
79
80	if msg.ReceivedFrom != t.pid {
81		for _, tr := range t.internal {
82			tr.RejectMessage(msg, reason)
83		}
84	}
85
86	if t.tracer == nil {
87		return
88	}
89
90	now := time.Now().UnixNano()
91	evt := &pb.TraceEvent{
92		Type:      pb.TraceEvent_REJECT_MESSAGE.Enum(),
93		PeerID:    []byte(t.pid),
94		Timestamp: &now,
95		RejectMessage: &pb.TraceEvent_RejectMessage{
96			MessageID:    []byte(t.msgID(msg.Message)),
97			ReceivedFrom: []byte(msg.ReceivedFrom),
98			Reason:       &reason,
99			Topic:        msg.Topic,
100		},
101	}
102
103	t.tracer.Trace(evt)
104}
105
106func (t *pubsubTracer) DuplicateMessage(msg *Message) {
107	if t == nil {
108		return
109	}
110
111	if msg.ReceivedFrom != t.pid {
112		for _, tr := range t.internal {
113			tr.DuplicateMessage(msg)
114		}
115	}
116
117	if t.tracer == nil {
118		return
119	}
120
121	now := time.Now().UnixNano()
122	evt := &pb.TraceEvent{
123		Type:      pb.TraceEvent_DUPLICATE_MESSAGE.Enum(),
124		PeerID:    []byte(t.pid),
125		Timestamp: &now,
126		DuplicateMessage: &pb.TraceEvent_DuplicateMessage{
127			MessageID:    []byte(t.msgID(msg.Message)),
128			ReceivedFrom: []byte(msg.ReceivedFrom),
129			Topic:        msg.Topic,
130		},
131	}
132
133	t.tracer.Trace(evt)
134}
135
136func (t *pubsubTracer) DeliverMessage(msg *Message) {
137	if t == nil {
138		return
139	}
140
141	if msg.ReceivedFrom != t.pid {
142		for _, tr := range t.internal {
143			tr.DeliverMessage(msg)
144		}
145	}
146
147	if t.tracer == nil {
148		return
149	}
150
151	now := time.Now().UnixNano()
152	evt := &pb.TraceEvent{
153		Type:      pb.TraceEvent_DELIVER_MESSAGE.Enum(),
154		PeerID:    []byte(t.pid),
155		Timestamp: &now,
156		DeliverMessage: &pb.TraceEvent_DeliverMessage{
157			MessageID:    []byte(t.msgID(msg.Message)),
158			Topic:        msg.Topic,
159			ReceivedFrom: []byte(msg.ReceivedFrom),
160		},
161	}
162
163	t.tracer.Trace(evt)
164}
165
166func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
167	if t == nil {
168		return
169	}
170
171	for _, tr := range t.internal {
172		tr.AddPeer(p, proto)
173	}
174
175	if t.tracer == nil {
176		return
177	}
178
179	protoStr := string(proto)
180	now := time.Now().UnixNano()
181	evt := &pb.TraceEvent{
182		Type:      pb.TraceEvent_ADD_PEER.Enum(),
183		PeerID:    []byte(t.pid),
184		Timestamp: &now,
185		AddPeer: &pb.TraceEvent_AddPeer{
186			PeerID: []byte(p),
187			Proto:  &protoStr,
188		},
189	}
190
191	t.tracer.Trace(evt)
192}
193
194func (t *pubsubTracer) RemovePeer(p peer.ID) {
195	if t == nil {
196		return
197	}
198
199	for _, tr := range t.internal {
200		tr.RemovePeer(p)
201	}
202
203	if t.tracer == nil {
204		return
205	}
206
207	now := time.Now().UnixNano()
208	evt := &pb.TraceEvent{
209		Type:      pb.TraceEvent_REMOVE_PEER.Enum(),
210		PeerID:    []byte(t.pid),
211		Timestamp: &now,
212		RemovePeer: &pb.TraceEvent_RemovePeer{
213			PeerID: []byte(p),
214		},
215	}
216
217	t.tracer.Trace(evt)
218}
219
220func (t *pubsubTracer) RecvRPC(rpc *RPC) {
221	if t == nil {
222		return
223	}
224
225	if t.tracer == nil {
226		return
227	}
228
229	now := time.Now().UnixNano()
230	evt := &pb.TraceEvent{
231		Type:      pb.TraceEvent_RECV_RPC.Enum(),
232		PeerID:    []byte(t.pid),
233		Timestamp: &now,
234		RecvRPC: &pb.TraceEvent_RecvRPC{
235			ReceivedFrom: []byte(rpc.from),
236			Meta:         t.traceRPCMeta(rpc),
237		},
238	}
239
240	t.tracer.Trace(evt)
241}
242
243func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
244	if t == nil {
245		return
246	}
247
248	if t.tracer == nil {
249		return
250	}
251
252	now := time.Now().UnixNano()
253	evt := &pb.TraceEvent{
254		Type:      pb.TraceEvent_SEND_RPC.Enum(),
255		PeerID:    []byte(t.pid),
256		Timestamp: &now,
257		SendRPC: &pb.TraceEvent_SendRPC{
258			SendTo: []byte(p),
259			Meta:   t.traceRPCMeta(rpc),
260		},
261	}
262
263	t.tracer.Trace(evt)
264}
265
266func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
267	if t == nil {
268		return
269	}
270
271	if t.tracer == nil {
272		return
273	}
274
275	now := time.Now().UnixNano()
276	evt := &pb.TraceEvent{
277		Type:      pb.TraceEvent_DROP_RPC.Enum(),
278		PeerID:    []byte(t.pid),
279		Timestamp: &now,
280		DropRPC: &pb.TraceEvent_DropRPC{
281			SendTo: []byte(p),
282			Meta:   t.traceRPCMeta(rpc),
283		},
284	}
285
286	t.tracer.Trace(evt)
287}
288
289func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
290	rpcMeta := new(pb.TraceEvent_RPCMeta)
291
292	var msgs []*pb.TraceEvent_MessageMeta
293	for _, m := range rpc.Publish {
294		msgs = append(msgs, &pb.TraceEvent_MessageMeta{
295			MessageID: []byte(t.msgID(m)),
296			Topic:     m.Topic,
297		})
298	}
299	rpcMeta.Messages = msgs
300
301	var subs []*pb.TraceEvent_SubMeta
302	for _, sub := range rpc.Subscriptions {
303		subs = append(subs, &pb.TraceEvent_SubMeta{
304			Subscribe: sub.Subscribe,
305			Topic:     sub.Topicid,
306		})
307	}
308	rpcMeta.Subscription = subs
309
310	if rpc.Control != nil {
311		var ihave []*pb.TraceEvent_ControlIHaveMeta
312		for _, ctl := range rpc.Control.Ihave {
313			var mids [][]byte
314			for _, mid := range ctl.MessageIDs {
315				mids = append(mids, []byte(mid))
316			}
317			ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{
318				Topic:      ctl.TopicID,
319				MessageIDs: mids,
320			})
321		}
322
323		var iwant []*pb.TraceEvent_ControlIWantMeta
324		for _, ctl := range rpc.Control.Iwant {
325			var mids [][]byte
326			for _, mid := range ctl.MessageIDs {
327				mids = append(mids, []byte(mid))
328			}
329			iwant = append(iwant, &pb.TraceEvent_ControlIWantMeta{
330				MessageIDs: mids,
331			})
332		}
333
334		var graft []*pb.TraceEvent_ControlGraftMeta
335		for _, ctl := range rpc.Control.Graft {
336			graft = append(graft, &pb.TraceEvent_ControlGraftMeta{
337				Topic: ctl.TopicID,
338			})
339		}
340
341		var prune []*pb.TraceEvent_ControlPruneMeta
342		for _, ctl := range rpc.Control.Prune {
343			peers := make([][]byte, 0, len(ctl.Peers))
344			for _, pi := range ctl.Peers {
345				peers = append(peers, pi.PeerID)
346			}
347			prune = append(prune, &pb.TraceEvent_ControlPruneMeta{
348				Topic: ctl.TopicID,
349				Peers: peers,
350			})
351		}
352
353		rpcMeta.Control = &pb.TraceEvent_ControlMeta{
354			Ihave: ihave,
355			Iwant: iwant,
356			Graft: graft,
357			Prune: prune,
358		}
359	}
360
361	return rpcMeta
362}
363
364func (t *pubsubTracer) Join(topic string) {
365	if t == nil {
366		return
367	}
368
369	for _, tr := range t.internal {
370		tr.Join(topic)
371	}
372
373	if t.tracer == nil {
374		return
375	}
376
377	now := time.Now().UnixNano()
378	evt := &pb.TraceEvent{
379		Type:      pb.TraceEvent_JOIN.Enum(),
380		PeerID:    []byte(t.pid),
381		Timestamp: &now,
382		Join: &pb.TraceEvent_Join{
383			Topic: &topic,
384		},
385	}
386
387	t.tracer.Trace(evt)
388}
389
390func (t *pubsubTracer) Leave(topic string) {
391	if t == nil {
392		return
393	}
394
395	for _, tr := range t.internal {
396		tr.Leave(topic)
397	}
398
399	if t.tracer == nil {
400		return
401	}
402
403	now := time.Now().UnixNano()
404	evt := &pb.TraceEvent{
405		Type:      pb.TraceEvent_LEAVE.Enum(),
406		PeerID:    []byte(t.pid),
407		Timestamp: &now,
408		Leave: &pb.TraceEvent_Leave{
409			Topic: &topic,
410		},
411	}
412
413	t.tracer.Trace(evt)
414}
415
416func (t *pubsubTracer) Graft(p peer.ID, topic string) {
417	if t == nil {
418		return
419	}
420
421	for _, tr := range t.internal {
422		tr.Graft(p, topic)
423	}
424
425	if t.tracer == nil {
426		return
427	}
428
429	now := time.Now().UnixNano()
430	evt := &pb.TraceEvent{
431		Type:      pb.TraceEvent_GRAFT.Enum(),
432		PeerID:    []byte(t.pid),
433		Timestamp: &now,
434		Graft: &pb.TraceEvent_Graft{
435			PeerID: []byte(p),
436			Topic:  &topic,
437		},
438	}
439
440	t.tracer.Trace(evt)
441}
442
443func (t *pubsubTracer) Prune(p peer.ID, topic string) {
444	if t == nil {
445		return
446	}
447
448	for _, tr := range t.internal {
449		tr.Prune(p, topic)
450	}
451
452	if t.tracer == nil {
453		return
454	}
455
456	now := time.Now().UnixNano()
457	evt := &pb.TraceEvent{
458		Type:      pb.TraceEvent_PRUNE.Enum(),
459		PeerID:    []byte(t.pid),
460		Timestamp: &now,
461		Prune: &pb.TraceEvent_Prune{
462			PeerID: []byte(p),
463			Topic:  &topic,
464		},
465	}
466
467	t.tracer.Trace(evt)
468}
469
470func (t *pubsubTracer) ThrottlePeer(p peer.ID) {
471	if t == nil {
472		return
473	}
474
475	for _, tr := range t.internal {
476		tr.ThrottlePeer(p)
477	}
478}
479