1package pubsub 2 3import ( 4 "compress/gzip" 5 "context" 6 "encoding/json" 7 "io" 8 "os" 9 "sync" 10 "time" 11 12 pb "github.com/libp2p/go-libp2p-pubsub/pb" 13 14 "github.com/libp2p/go-libp2p-core/host" 15 "github.com/libp2p/go-libp2p-core/network" 16 "github.com/libp2p/go-libp2p-core/peer" 17 "github.com/libp2p/go-libp2p-core/peerstore" 18 "github.com/libp2p/go-libp2p-core/protocol" 19 20 "github.com/libp2p/go-msgio/protoio" 21) 22 23var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words. 24var MinTraceBatchSize = 16 25 26// rejection reasons 27const ( 28 rejectBlacklstedPeer = "blacklisted peer" 29 rejectBlacklistedSource = "blacklisted source" 30 rejectMissingSignature = "missing signature" 31 rejectUnexpectedSignature = "unexpected signature" 32 rejectUnexpectedAuthInfo = "unexpected auth info" 33 rejectInvalidSignature = "invalid signature" 34 rejectValidationQueueFull = "validation queue full" 35 rejectValidationThrottled = "validation throttled" 36 rejectValidationFailed = "validation failed" 37 rejectValidationIgnored = "validation ignored" 38 rejectSelfOrigin = "self originated message" 39) 40 41type basicTracer struct { 42 ch chan struct{} 43 mx sync.Mutex 44 buf []*pb.TraceEvent 45 lossy bool 46 closed bool 47} 48 49func (t *basicTracer) Trace(evt *pb.TraceEvent) { 50 t.mx.Lock() 51 defer t.mx.Unlock() 52 53 if t.closed { 54 return 55 } 56 57 if t.lossy && len(t.buf) > TraceBufferSize { 58 log.Debug("trace buffer overflow; dropping trace event") 59 } else { 60 t.buf = append(t.buf, evt) 61 } 62 63 select { 64 case t.ch <- struct{}{}: 65 default: 66 } 67} 68 69func (t *basicTracer) Close() { 70 t.mx.Lock() 71 defer t.mx.Unlock() 72 if !t.closed { 73 t.closed = true 74 close(t.ch) 75 } 76} 77 78// JSONTracer is a tracer that writes events to a file, encoded in ndjson. 79type JSONTracer struct { 80 basicTracer 81 w io.WriteCloser 82} 83 84// NewJsonTracer creates a new JSONTracer writing traces to file. 85func NewJSONTracer(file string) (*JSONTracer, error) { 86 return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) 87} 88 89// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions. 90func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) { 91 f, err := os.OpenFile(file, flags, perm) 92 if err != nil { 93 return nil, err 94 } 95 96 tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}} 97 go tr.doWrite() 98 99 return tr, nil 100} 101 102func (t *JSONTracer) doWrite() { 103 var buf []*pb.TraceEvent 104 enc := json.NewEncoder(t.w) 105 for { 106 _, ok := <-t.ch 107 108 t.mx.Lock() 109 tmp := t.buf 110 t.buf = buf[:0] 111 buf = tmp 112 t.mx.Unlock() 113 114 for i, evt := range buf { 115 err := enc.Encode(evt) 116 if err != nil { 117 log.Warnf("error writing event trace: %s", err.Error()) 118 } 119 buf[i] = nil 120 } 121 122 if !ok { 123 t.w.Close() 124 return 125 } 126 } 127} 128 129var _ EventTracer = (*JSONTracer)(nil) 130 131// PBTracer is a tracer that writes events to a file, as delimited protobufs. 132type PBTracer struct { 133 basicTracer 134 w io.WriteCloser 135} 136 137func NewPBTracer(file string) (*PBTracer, error) { 138 return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) 139} 140 141// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions. 142func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) { 143 f, err := os.OpenFile(file, flags, perm) 144 if err != nil { 145 return nil, err 146 } 147 148 tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}} 149 go tr.doWrite() 150 151 return tr, nil 152} 153 154func (t *PBTracer) doWrite() { 155 var buf []*pb.TraceEvent 156 w := protoio.NewDelimitedWriter(t.w) 157 for { 158 _, ok := <-t.ch 159 160 t.mx.Lock() 161 tmp := t.buf 162 t.buf = buf[:0] 163 buf = tmp 164 t.mx.Unlock() 165 166 for i, evt := range buf { 167 err := w.WriteMsg(evt) 168 if err != nil { 169 log.Warnf("error writing event trace: %s", err.Error()) 170 } 171 buf[i] = nil 172 } 173 174 if !ok { 175 t.w.Close() 176 return 177 } 178 } 179} 180 181var _ EventTracer = (*PBTracer)(nil) 182 183const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0") 184 185// RemoteTracer is a tracer that sends trace events to a remote peer 186type RemoteTracer struct { 187 basicTracer 188 ctx context.Context 189 host host.Host 190 peer peer.ID 191} 192 193// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi 194func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) { 195 tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}} 196 host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL) 197 go tr.doWrite() 198 return tr, nil 199} 200 201func (t *RemoteTracer) doWrite() { 202 var buf []*pb.TraceEvent 203 204 s, err := t.openStream() 205 if err != nil { 206 log.Debugf("error opening remote tracer stream: %s", err.Error()) 207 return 208 } 209 210 var batch pb.TraceEventBatch 211 212 gzipW := gzip.NewWriter(s) 213 w := protoio.NewDelimitedWriter(gzipW) 214 215 for { 216 _, ok := <-t.ch 217 218 // deadline for batch accumulation 219 deadline := time.Now().Add(time.Second) 220 221 t.mx.Lock() 222 for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) { 223 t.mx.Unlock() 224 time.Sleep(100 * time.Millisecond) 225 t.mx.Lock() 226 } 227 228 tmp := t.buf 229 t.buf = buf[:0] 230 buf = tmp 231 t.mx.Unlock() 232 233 if len(buf) == 0 { 234 goto end 235 } 236 237 batch.Batch = buf 238 239 err = w.WriteMsg(&batch) 240 if err != nil { 241 log.Debugf("error writing trace event batch: %s", err) 242 goto end 243 } 244 245 err = gzipW.Flush() 246 if err != nil { 247 log.Debugf("error flushin gzip stream: %s", err) 248 goto end 249 } 250 251 end: 252 // nil out the buffer to gc consumed events 253 for i := range buf { 254 buf[i] = nil 255 } 256 257 if !ok { 258 if err != nil { 259 s.Reset() 260 } else { 261 gzipW.Close() 262 s.Close() 263 } 264 return 265 } 266 267 if err != nil { 268 s.Reset() 269 s, err = t.openStream() 270 if err != nil { 271 log.Debugf("error opening remote tracer stream: %s", err.Error()) 272 return 273 } 274 275 gzipW.Reset(s) 276 } 277 } 278} 279 280func (t *RemoteTracer) openStream() (network.Stream, error) { 281 for { 282 ctx, cancel := context.WithTimeout(t.ctx, time.Minute) 283 s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID) 284 cancel() 285 if err != nil { 286 if t.ctx.Err() != nil { 287 return nil, err 288 } 289 290 // wait a minute and try again, to account for transient server downtime 291 select { 292 case <-time.After(time.Minute): 293 continue 294 case <-t.ctx.Done(): 295 return nil, t.ctx.Err() 296 } 297 } 298 299 return s, nil 300 } 301} 302 303var _ EventTracer = (*RemoteTracer)(nil) 304