1package pubsub
2
3import (
4	"compress/gzip"
5	"context"
6	"encoding/json"
7	"io"
8	"os"
9	"sync"
10	"time"
11
12	pb "github.com/libp2p/go-libp2p-pubsub/pb"
13
14	"github.com/libp2p/go-libp2p-core/host"
15	"github.com/libp2p/go-libp2p-core/network"
16	"github.com/libp2p/go-libp2p-core/peer"
17	"github.com/libp2p/go-libp2p-core/peerstore"
18	"github.com/libp2p/go-libp2p-core/protocol"
19
20	"github.com/libp2p/go-msgio/protoio"
21)
22
23var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
24var MinTraceBatchSize = 16
25
26// rejection reasons
27const (
28	rejectBlacklstedPeer      = "blacklisted peer"
29	rejectBlacklistedSource   = "blacklisted source"
30	rejectMissingSignature    = "missing signature"
31	rejectUnexpectedSignature = "unexpected signature"
32	rejectUnexpectedAuthInfo  = "unexpected auth info"
33	rejectInvalidSignature    = "invalid signature"
34	rejectValidationQueueFull = "validation queue full"
35	rejectValidationThrottled = "validation throttled"
36	rejectValidationFailed    = "validation failed"
37	rejectValidationIgnored   = "validation ignored"
38	rejectSelfOrigin          = "self originated message"
39)
40
41type basicTracer struct {
42	ch     chan struct{}
43	mx     sync.Mutex
44	buf    []*pb.TraceEvent
45	lossy  bool
46	closed bool
47}
48
49func (t *basicTracer) Trace(evt *pb.TraceEvent) {
50	t.mx.Lock()
51	defer t.mx.Unlock()
52
53	if t.closed {
54		return
55	}
56
57	if t.lossy && len(t.buf) > TraceBufferSize {
58		log.Debug("trace buffer overflow; dropping trace event")
59	} else {
60		t.buf = append(t.buf, evt)
61	}
62
63	select {
64	case t.ch <- struct{}{}:
65	default:
66	}
67}
68
69func (t *basicTracer) Close() {
70	t.mx.Lock()
71	defer t.mx.Unlock()
72	if !t.closed {
73		t.closed = true
74		close(t.ch)
75	}
76}
77
78// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
79type JSONTracer struct {
80	basicTracer
81	w io.WriteCloser
82}
83
84// NewJsonTracer creates a new JSONTracer writing traces to file.
85func NewJSONTracer(file string) (*JSONTracer, error) {
86	return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
87}
88
89// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
90func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
91	f, err := os.OpenFile(file, flags, perm)
92	if err != nil {
93		return nil, err
94	}
95
96	tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
97	go tr.doWrite()
98
99	return tr, nil
100}
101
102func (t *JSONTracer) doWrite() {
103	var buf []*pb.TraceEvent
104	enc := json.NewEncoder(t.w)
105	for {
106		_, ok := <-t.ch
107
108		t.mx.Lock()
109		tmp := t.buf
110		t.buf = buf[:0]
111		buf = tmp
112		t.mx.Unlock()
113
114		for i, evt := range buf {
115			err := enc.Encode(evt)
116			if err != nil {
117				log.Warnf("error writing event trace: %s", err.Error())
118			}
119			buf[i] = nil
120		}
121
122		if !ok {
123			t.w.Close()
124			return
125		}
126	}
127}
128
129var _ EventTracer = (*JSONTracer)(nil)
130
131// PBTracer is a tracer that writes events to a file, as delimited protobufs.
132type PBTracer struct {
133	basicTracer
134	w io.WriteCloser
135}
136
137func NewPBTracer(file string) (*PBTracer, error) {
138	return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
139}
140
141// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
142func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
143	f, err := os.OpenFile(file, flags, perm)
144	if err != nil {
145		return nil, err
146	}
147
148	tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
149	go tr.doWrite()
150
151	return tr, nil
152}
153
154func (t *PBTracer) doWrite() {
155	var buf []*pb.TraceEvent
156	w := protoio.NewDelimitedWriter(t.w)
157	for {
158		_, ok := <-t.ch
159
160		t.mx.Lock()
161		tmp := t.buf
162		t.buf = buf[:0]
163		buf = tmp
164		t.mx.Unlock()
165
166		for i, evt := range buf {
167			err := w.WriteMsg(evt)
168			if err != nil {
169				log.Warnf("error writing event trace: %s", err.Error())
170			}
171			buf[i] = nil
172		}
173
174		if !ok {
175			t.w.Close()
176			return
177		}
178	}
179}
180
181var _ EventTracer = (*PBTracer)(nil)
182
183const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
184
185// RemoteTracer is a tracer that sends trace events to a remote peer
186type RemoteTracer struct {
187	basicTracer
188	ctx  context.Context
189	host host.Host
190	peer peer.ID
191}
192
193// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
194func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
195	tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
196	host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
197	go tr.doWrite()
198	return tr, nil
199}
200
201func (t *RemoteTracer) doWrite() {
202	var buf []*pb.TraceEvent
203
204	s, err := t.openStream()
205	if err != nil {
206		log.Debugf("error opening remote tracer stream: %s", err.Error())
207		return
208	}
209
210	var batch pb.TraceEventBatch
211
212	gzipW := gzip.NewWriter(s)
213	w := protoio.NewDelimitedWriter(gzipW)
214
215	for {
216		_, ok := <-t.ch
217
218		// deadline for batch accumulation
219		deadline := time.Now().Add(time.Second)
220
221		t.mx.Lock()
222		for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) {
223			t.mx.Unlock()
224			time.Sleep(100 * time.Millisecond)
225			t.mx.Lock()
226		}
227
228		tmp := t.buf
229		t.buf = buf[:0]
230		buf = tmp
231		t.mx.Unlock()
232
233		if len(buf) == 0 {
234			goto end
235		}
236
237		batch.Batch = buf
238
239		err = w.WriteMsg(&batch)
240		if err != nil {
241			log.Debugf("error writing trace event batch: %s", err)
242			goto end
243		}
244
245		err = gzipW.Flush()
246		if err != nil {
247			log.Debugf("error flushin gzip stream: %s", err)
248			goto end
249		}
250
251	end:
252		// nil out the buffer to gc consumed events
253		for i := range buf {
254			buf[i] = nil
255		}
256
257		if !ok {
258			if err != nil {
259				s.Reset()
260			} else {
261				gzipW.Close()
262				s.Close()
263			}
264			return
265		}
266
267		if err != nil {
268			s.Reset()
269			s, err = t.openStream()
270			if err != nil {
271				log.Debugf("error opening remote tracer stream: %s", err.Error())
272				return
273			}
274
275			gzipW.Reset(s)
276		}
277	}
278}
279
280func (t *RemoteTracer) openStream() (network.Stream, error) {
281	for {
282		ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
283		s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID)
284		cancel()
285		if err != nil {
286			if t.ctx.Err() != nil {
287				return nil, err
288			}
289
290			// wait a minute and try again, to account for transient server downtime
291			select {
292			case <-time.After(time.Minute):
293				continue
294			case <-t.ctx.Done():
295				return nil, t.ctx.Err()
296			}
297		}
298
299		return s, nil
300	}
301}
302
303var _ EventTracer = (*RemoteTracer)(nil)
304