1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongoreplay 8 9import ( 10 "fmt" 11 "log" 12 "sync" 13 "time" 14 15 "github.com/google/gopacket" 16 "github.com/google/gopacket/layers" 17 "github.com/google/gopacket/tcpassembly" 18) 19 20var memLog = new(bool) 21var debugLog = new(bool) 22 23const invalidSequence = -1 24const uint32Max = 0xFFFFFFFF 25 26// Sequence is a TCP sequence number. It provides a few convenience functions 27// for handling TCP wrap-around. The sequence should always be in the range 28// [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations 29// and should never be set. 30type Sequence int64 31 32// Difference defines an ordering for comparing TCP sequences that's safe for 33// roll-overs. It returns: 34// > 0 : if t comes after s 35// < 0 : if t comes before s 36// 0 : if t == s 37// The number returned is the sequence difference, so 4.Difference(8) will 38// return 4. 39// 40// It handles rollovers by considering any sequence in the first quarter of the 41// uint32 space to be after any sequence in the last quarter of that space, thus 42// wrapping the uint32 space. 43func (s Sequence) Difference(t Sequence) int { 44 if s > uint32Max-uint32Max/4 && t < uint32Max/4 { 45 t += uint32Max 46 } else if t > uint32Max-uint32Max/4 && s < uint32Max/4 { 47 s += uint32Max 48 } 49 return int(t - s) 50} 51 52// Add adds an integer to a sequence and returns the resulting sequence. 53func (s Sequence) Add(t int) Sequence { 54 return (s + Sequence(t)) & uint32Max 55} 56 57// Reassembly objects are passed by an Assembler into Streams using the 58// Reassembled call. Callers should not need to create these structs themselves 59// except for testing. 60type Reassembly struct { 61 // Bytes is the next set of bytes in the stream. May be empty. 62 Bytes []byte 63 // Skip is set to non-zero if bytes were skipped between this and the last 64 // Reassembly. If this is the first packet in a connection and we didn't 65 // see the start, we have no idea how many bytes we skipped, so we set it to 66 // -1. Otherwise, it's set to the number of bytes skipped. 67 Skip int 68 // Start is set if this set of bytes has a TCP SYN accompanying it. 69 Start bool 70 // End is set if this set of bytes has a TCP FIN or RST accompanying it. 71 End bool 72 // Seen is the timestamp this set of bytes was pulled off the wire. 73 Seen time.Time 74} 75 76const pageBytes = 1900 77 78// page is used to store TCP data we're not ready for yet (out-of-order 79// packets). Unused pages are stored in and returned from a pageCache, which 80// avoids memory allocation. Used pages are stored in a doubly-linked list in a 81// connection. 82type page struct { 83 tcpassembly.Reassembly 84 seq Sequence 85 index int 86 prev, next *page 87 buf [pageBytes]byte 88} 89 90// pageCache is a concurrency-unsafe store of page objects we use to avoid 91// memory allocation as much as we can. It grows but never shrinks. 92type pageCache struct { 93 free []*page 94 pcSize int 95 size, used int 96 pages [][]page 97 pageRequests int64 98} 99 100const initialAllocSize = 1024 101 102func newPageCache() *pageCache { 103 pc := &pageCache{ 104 free: make([]*page, 0, initialAllocSize), 105 pcSize: initialAllocSize, 106 } 107 pc.grow() 108 return pc 109} 110 111// grow exponentially increases the size of our page cache as much as necessary. 112func (c *pageCache) grow() { 113 pages := make([]page, c.pcSize) 114 c.pages = append(c.pages, pages) 115 c.size += c.pcSize 116 for i := range pages { 117 c.free = append(c.free, &pages[i]) 118 } 119 if *memLog { 120 log.Println("PageCache: created", c.pcSize, "new pages") 121 } 122 c.pcSize *= 2 123} 124 125// next returns a clean, ready-to-use page object. 126func (c *pageCache) next(ts time.Time) (p *page) { 127 if *memLog { 128 c.pageRequests++ 129 if c.pageRequests&0xFFFF == 0 { 130 log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free") 131 } 132 } 133 if len(c.free) == 0 { 134 c.grow() 135 } 136 i := len(c.free) - 1 137 p, c.free = c.free[i], c.free[:i] 138 p.prev = nil 139 p.next = nil 140 p.Seen = ts 141 p.Bytes = p.buf[:0] 142 c.used++ 143 return p 144} 145 146// replace replaces a page into the pageCache. 147func (c *pageCache) replace(p *page) { 148 c.used-- 149 c.free = append(c.free, p) 150} 151 152// Stream is implemented by the caller to handle incoming reassembled TCP data. 153// Callers create a StreamFactory, then StreamPool uses it to create a new 154// Stream for every TCP stream. 155// 156// assembly will, in order: 157// 1) Create the stream via StreamFactory.New 158// 2) Call Reassembled 0 or more times, passing in reassembled TCP data in 159// order 160// 3) Call ReassemblyComplete one time, after which the stream is 161// dereferenced by assembly. 162type Stream interface { 163 // Reassembled is called zero or more times. Assembly guarantees that the 164 // set of all Reassembly objects passed in during all calls are presented in 165 // the order they appear in the TCP stream. Reassembly objects are reused 166 // after each Reassembled call, so it's important to copy anything you need 167 // out of them (specifically out of Reassembly.Bytes) that you need to stay 168 // around after you return from the Reassembled call. 169 Reassembled([]Reassembly) 170 // ReassemblyComplete is called when assembly decides there is no more data 171 // for this Stream, either because a FIN or RST packet was seen, or because 172 // the stream has timed out without any new packet data (due to a call to 173 // FlushOlderThan). 174 ReassemblyComplete() 175} 176 177// StreamFactory is used by assembly to create a new stream for each new TCP 178// session. 179type StreamFactory interface { 180 // New should return a new stream for the given TCP key. 181 New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream 182} 183 184func (p *StreamPool) connections() []*connection { 185 p.mu.RLock() 186 conns := make([]*connection, 0, len(p.conns)) 187 for _, conn := range p.conns { 188 conns = append(conns, conn) 189 } 190 p.mu.RUnlock() 191 return conns 192} 193 194// FlushOlderThan finds any streams waiting for packets older than the given 195// time, and pushes through the data they have (IE: tells them to stop waiting 196// and skip the data they're waiting for). 197// 198// Each Stream maintains a list of zero or more sets of bytes it has received 199// out-of-order. For example, if it has processed up through sequence number 200// 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of 201// bytes also has the timestamp it was originally viewed. A flush call will 202// look at the smallest subsequent set of bytes, in this case [15-20), and if 203// its timestamp is older than the passed-in time, it will push it and all 204// contiguous byte-sets out to the Stream's Reassembled function. In this case, 205// it will push [15-20), but also [20-25), since that's contiguous. It will 206// only push [30-50) if its timestamp is also older than the passed-in time, 207// otherwise it will wait until the next FlushOlderThan to see if bytes [25-30) 208// come in. 209// 210// If it pushes all bytes (or there were no sets of bytes to begin with) AND the 211// connection has not received any bytes since the passed-in time, the 212// connection will be closed. 213// 214// Returns the number of connections flushed, and of those, the number closed 215// because of the flush. 216func (a *Assembler) FlushOlderThan(t time.Time) (flushed, closed int) { 217 conns := a.connPool.connections() 218 closes := 0 219 flushes := 0 220 for _, conn := range conns { 221 flushed := false 222 conn.mu.Lock() 223 if conn.closed { 224 // Already closed connection, nothing to do here. 225 conn.mu.Unlock() 226 continue 227 } 228 for conn.first != nil && conn.first.Seen.Before(t) { 229 a.skipFlush(conn) 230 flushed = true 231 if conn.closed { 232 closes++ 233 break 234 } 235 } 236 if !conn.closed && conn.first == nil && conn.lastSeen.Before(t) { 237 flushed = true 238 a.closeConnection(conn) 239 closes++ 240 } 241 if flushed { 242 flushes++ 243 } 244 conn.mu.Unlock() 245 } 246 return flushes, closes 247} 248 249// FlushAll flushes all remaining data into all remaining connections, closing 250// those connections. It returns the total number of connections flushed/closed 251// by the call. 252func (a *Assembler) FlushAll() (closed int) { 253 conns := a.connPool.connections() 254 closed = len(conns) 255 for _, conn := range conns { 256 conn.mu.Lock() 257 for !conn.closed { 258 a.skipFlush(conn) 259 } 260 conn.mu.Unlock() 261 } 262 return 263} 264 265type key [2]gopacket.Flow 266 267func (k *key) String() string { 268 return fmt.Sprintf("%s:%s", k[0], k[1]) 269} 270 271// StreamPool stores all streams created by Assemblers, allowing multiple 272// assemblers to work together on stream processing while enforcing the fact 273// that a single stream receives its data serially. It is safe for concurrency, 274// usable by multiple Assemblers at once. 275// 276// StreamPool handles the creation and storage of Stream objects used by one or 277// more Assembler objects. When a new TCP stream is found by an Assembler, it 278// creates an associated Stream by calling its StreamFactory's New method. 279// Thereafter (until the stream is closed), that Stream object will receive 280// assembled TCP data via Assembler's calls to the stream's Reassembled 281// function. 282// 283// Like the Assembler, StreamPool attempts to minimize allocation. Unlike the 284// Assembler, though, it does have to do some locking to make sure that the 285// connection objects it stores are accessible to multiple Assemblers. 286type StreamPool struct { 287 conns map[key]*connection 288 users int 289 mu sync.RWMutex 290 factory StreamFactory 291 free []*connection 292 all [][]connection 293 nextAlloc int 294 newConnectionCount int64 295} 296 297func (p *StreamPool) grow() { 298 conns := make([]connection, p.nextAlloc) 299 p.all = append(p.all, conns) 300 for i := range conns { 301 p.free = append(p.free, &conns[i]) 302 } 303 if *memLog { 304 log.Println("StreamPool: created", p.nextAlloc, "new connections") 305 } 306 p.nextAlloc *= 2 307} 308 309// NewStreamPool creates a new connection pool. Streams will 310// be created as necessary using the passed-in StreamFactory. 311func NewStreamPool(factory StreamFactory) *StreamPool { 312 return &StreamPool{ 313 conns: make(map[key]*connection, initialAllocSize), 314 free: make([]*connection, 0, initialAllocSize), 315 factory: factory, 316 nextAlloc: initialAllocSize, 317 } 318} 319 320const assemblerReturnValueInitialSize = 16 321 322// NewAssembler creates a new assembler. Pass in the StreamPool 323// to use, may be shared across assemblers. 324// 325// This sets some sane defaults for the assembler options, 326// see DefaultAssemblerOptions for details. 327func NewAssembler(pool *StreamPool) *Assembler { 328 pool.mu.Lock() 329 pool.users++ 330 pool.mu.Unlock() 331 return &Assembler{ 332 ret: make([]tcpassembly.Reassembly, assemblerReturnValueInitialSize), 333 pc: newPageCache(), 334 connPool: pool, 335 AssemblerOptions: DefaultAssemblerOptions, 336 } 337} 338 339// DefaultAssemblerOptions provides default options for an assembler. 340// These options are used by default when calling NewAssembler, so if 341// modified before a NewAssembler call they'll affect the resulting Assembler. 342// 343// Note that the default options can result in ever-increasing memory usage 344// unless one of the Flush* methods is called on a regular basis. 345var DefaultAssemblerOptions = AssemblerOptions{ 346 MaxBufferedPagesPerConnection: 0, // unlimited 347 MaxBufferedPagesTotal: 0, // unlimited 348} 349 350type connection struct { 351 key key 352 pages int 353 first, last *page 354 nextSeq Sequence 355 created, lastSeen time.Time 356 stream tcpassembly.Stream 357 closed bool 358 mu sync.Mutex 359} 360 361func (conn *connection) reset(k key, s tcpassembly.Stream, ts time.Time) { 362 conn.key = k 363 conn.pages = 0 364 conn.first, conn.last = nil, nil 365 conn.nextSeq = invalidSequence 366 conn.created = ts 367 conn.stream = s 368 conn.closed = false 369} 370 371// AssemblerOptions controls the behavior of each assembler. Modify the 372// options of each assembler you create to change their behavior. 373type AssemblerOptions struct { 374 // MaxBufferedPagesTotal is an upper limit on the total number of pages to 375 // buffer while waiting for out-of-order packets. Once this limit is 376 // reached, the assembler will degrade to flushing every connection it gets 377 // a packet for. If <= 0, this is ignored. 378 MaxBufferedPagesTotal int 379 // MaxBufferedPagesPerConnection is an upper limit on the number of pages 380 // buffered for a single connection. Should this limit be reached for a 381 // particular connection, the smallest sequence number will be flushed, 382 // along with any contiguous data. If <= 0, this is ignored. 383 MaxBufferedPagesPerConnection int 384} 385 386// Assembler handles reassembling TCP streams. It is not safe for 387// concurrency... after passing a packet in via the Assemble call, the caller 388// must wait for that call to return before calling Assemble again. Callers can 389// get around this by creating multiple assemblers that share a StreamPool. In 390// that case, each individual stream will still be handled serially (each stream 391// has an individual mutex associated with it), however multiple assemblers can 392// assemble different connections concurrently. 393// 394// The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing 395// applications written in Go. The Assembler uses the following methods to be 396// as fast as possible, to keep packet processing speedy: 397// 398// Avoids Lock Contention 399// 400// Assemblers locks connections, but each connection has an individual lock, and 401// rarely will two Assemblers be looking at the same connection. Assemblers 402// lock the StreamPool when looking up connections, but they use Reader locks 403// initially, and only force a write lock if they need to create a new 404// connection or close one down. These happen much less frequently than 405// individual packet handling. 406// 407// Each assembler runs in its own goroutine, and the only state shared between 408// goroutines is through the StreamPool. Thus all internal Assembler state can 409// be handled without any locking. 410// 411// NOTE: If you can guarantee that packets going to a set of Assemblers will 412// contain information on different connections per Assembler (for example, 413// they're already hashed by PF_RING hashing or some other hashing mechanism), 414// then we recommend you use a separate StreamPool per Assembler, thus avoiding 415// all lock contention. Only when different Assemblers could receive packets 416// for the same Stream should a StreamPool be shared between them. 417// 418// Avoids Memory Copying 419// 420// In the common case, handling of a single TCP packet should result in zero 421// memory allocations. The Assembler will look up the connection, figure out 422// that the packet has arrived in order, and immediately pass that packet on to 423// the appropriate connection's handling code. Only if a packet arrives out of 424// order is its contents copied and stored in memory for later. 425// 426// Avoids Memory Allocation 427// 428// Assemblers try very hard to not use memory allocation unless absolutely 429// necessary. Packet data for sequential packets is passed directly to streams 430// with no copying or allocation. Packet data for out-of-order packets is 431// copied into reusable pages, and new pages are only allocated rarely when the 432// page cache runs out. Page caches are Assembler-specific, thus not used 433// concurrently and requiring no locking. 434// 435// Internal representations for connection objects are also reused over time. 436// Because of this, the most common memory allocation done by the Assembler is 437// generally what's done by the caller in StreamFactory.New. If no allocation 438// is done there, then very little allocation is done ever, mostly to handle 439// large increases in bandwidth or numbers of connections. 440// 441// TODO: The page caches used by an Assembler will grow to the size necessary 442// to handle a workload, and currently will never shrink. This means that 443// traffic spikes can result in large memory usage which isn't garbage collected 444// when typical traffic levels return. 445type Assembler struct { 446 AssemblerOptions 447 ret []tcpassembly.Reassembly 448 pc *pageCache 449 connPool *StreamPool 450} 451 452func (p *StreamPool) newConnection(k key, s tcpassembly.Stream, ts time.Time) (c *connection) { 453 if *memLog { 454 p.newConnectionCount++ 455 if p.newConnectionCount&0x7FFF == 0 { 456 log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free") 457 } 458 } 459 if len(p.free) == 0 { 460 p.grow() 461 } 462 index := len(p.free) - 1 463 c, p.free = p.free[index], p.free[:index] 464 c.reset(k, s, ts) 465 return c 466} 467 468// getConnection returns a connection. If end is true and a connection 469// does not already exist, returns nil. This allows us to check for a 470// connection without actually creating one if it doesn't already exist. 471func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection { 472 p.mu.RLock() 473 conn := p.conns[k] 474 p.mu.RUnlock() 475 if end || conn != nil { 476 return conn 477 } 478 s := p.factory.New(k[0], k[1]) 479 p.mu.Lock() 480 conn = p.newConnection(k, s, ts) 481 if conn2 := p.conns[k]; conn2 != nil { 482 p.mu.Unlock() 483 return conn2 484 } 485 p.conns[k] = conn 486 p.mu.Unlock() 487 return conn 488} 489 490// Assemble calls AssembleWithTimestamp with the current timestamp, useful for 491// packets being read directly off the wire. 492func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP) { 493 a.AssembleWithTimestamp(netFlow, t, time.Now()) 494} 495 496// AssembleWithTimestamp reassembles the given TCP packet into its appropriate 497// stream. 498// 499// The timestamp passed in must be the timestamp the packet was seen. For 500// packets read off the wire, time.Now() should be fine. For packets read from 501// PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will 502// affect which streams are flushed by a call to FlushOlderThan. 503// 504// Each Assemble call results in, in order: 505// 506// zero or one calls to StreamFactory.New, creating a stream 507// zero or one calls to Reassembled on a single stream 508// zero or one calls to ReassemblyComplete on the same stream 509func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) { 510 // Ignore empty TCP packets 511 if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 { 512 return 513 } 514 515 a.ret = a.ret[:0] 516 key := key{netFlow, t.TransportFlow()} 517 var conn *connection 518 // This for loop handles a race condition where a connection will close, 519 // lock the connection pool, and remove itself, but before it locked the 520 // connection pool it's returned to another Assemble statement. This should 521 // loop 0-1 times for the VAST majority of cases. 522 for { 523 conn = a.connPool.getConnection( 524 key, !t.SYN && len(t.LayerPayload()) == 0, timestamp) 525 if conn == nil { 526 if *debugLog { 527 log.Printf("%v got empty packet on otherwise empty connection", key) 528 } 529 return 530 } 531 conn.mu.Lock() 532 if !conn.closed { 533 break 534 } 535 conn.mu.Unlock() 536 } 537 if conn.lastSeen.Before(timestamp) { 538 conn.lastSeen = timestamp 539 } 540 seq, bytes := Sequence(t.Seq), t.Payload 541 542 if conn.nextSeq == invalidSequence { 543 // Handling the first packet we've seen on the stream. 544 skip := 0 545 if !t.SYN { 546 // don't add 1 since we're just going to assume the sequence number 547 // without the SYN packet. 548 // stream was picked up somewhere in the middle, so indicate that we 549 // don't know how many packets came before it. 550 conn.nextSeq = seq.Add(len(bytes)) 551 skip = -1 552 } else { 553 // for SYN packets, also increment the sequence number by 1 554 conn.nextSeq = seq.Add(len(bytes) + 1) 555 } 556 a.ret = append(a.ret, tcpassembly.Reassembly{ 557 Bytes: bytes, 558 Skip: skip, 559 Start: t.SYN, 560 Seen: timestamp, 561 }) 562 a.insertIntoConn(t, conn, timestamp) 563 } else if diff := conn.nextSeq.Difference(seq); diff > 0 { 564 a.insertIntoConn(t, conn, timestamp) 565 } else { 566 bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes) 567 a.ret = append(a.ret, tcpassembly.Reassembly{ 568 Bytes: bytes, 569 Skip: 0, 570 End: t.RST || t.FIN, 571 Seen: timestamp, 572 }) 573 } 574 if len(a.ret) > 0 { 575 a.sendToConnection(conn) 576 } 577 conn.mu.Unlock() 578} 579 580func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) { 581 if expected == invalidSequence { 582 return bytes, received.Add(len(bytes)) 583 } 584 span := int(received.Difference(expected)) 585 if span <= 0 { 586 return bytes, received.Add(len(bytes)) 587 } else if len(bytes) < span { 588 return nil, expected 589 } 590 return bytes[span:], expected.Add(len(bytes) - span) 591} 592 593// sendToConnection sends the current values in a.ret to the connection, closing 594// the connection if the last thing sent had End set. 595func (a *Assembler) sendToConnection(conn *connection) { 596 a.addContiguous(conn) 597 if conn.stream == nil { 598 panic("why?") 599 } 600 conn.stream.Reassembled(a.ret) 601 if a.ret[len(a.ret)-1].End { 602 a.closeConnection(conn) 603 } 604} 605 606// addContiguous adds contiguous byte-sets to a connection. 607func (a *Assembler) addContiguous(conn *connection) { 608 for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 { 609 a.addNextFromConn(conn) 610 } 611} 612 613// skipFlush skips the first set of bytes we're waiting for and returns the 614// first set of bytes we have. If we have no bytes pending, it closes the 615// connection. 616func (a *Assembler) skipFlush(conn *connection) { 617 if *debugLog { 618 log.Printf("%v skipFlush %v", conn.key, conn.nextSeq) 619 } 620 if conn.first == nil { 621 a.closeConnection(conn) 622 return 623 } 624 a.ret = a.ret[:0] 625 a.addNextFromConn(conn) 626 a.addContiguous(conn) 627 a.sendToConnection(conn) 628} 629 630func (p *StreamPool) remove(conn *connection) { 631 p.mu.Lock() 632 delete(p.conns, conn.key) 633 p.free = append(p.free, conn) 634 p.mu.Unlock() 635} 636 637func (a *Assembler) closeConnection(conn *connection) { 638 if *debugLog { 639 log.Printf("%v closing", conn.key) 640 } 641 conn.stream.ReassemblyComplete() 642 conn.closed = true 643 a.connPool.remove(conn) 644 for p := conn.first; p != nil; p = p.next { 645 a.pc.replace(p) 646 } 647} 648 649// traverseConn traverses our doubly-linked list of pages for the correct 650// position to put the given sequence number. Note that it traverses backwards, 651// starting at the highest sequence number and going down, since we assume the 652// common case is that TCP packets for a stream will appear in-order, with 653// minimal loss or packet reordering. 654func (conn *connection) traverseConn(seq Sequence) (prev, current *page) { 655 prev = conn.last 656 for prev != nil && prev.seq.Difference(seq) < 0 { 657 current = prev 658 prev = current.prev 659 } 660 return 661} 662 663// pushBetween inserts the doubly-linked list first-...-last in between the 664// nodes prev-next in another doubly-linked list. If prev is nil, makes first 665// the new first page in the connection's list. If next is nil, makes last the 666// new last page in the list. first/last may point to the same page. 667func (conn *connection) pushBetween(prev, next, first, last *page) { 668 // Maintain our doubly linked list 669 if next == nil || conn.last == nil { 670 conn.last = last 671 } else { 672 last.next = next 673 next.prev = last 674 } 675 if prev == nil || conn.first == nil { 676 conn.first = first 677 } else { 678 first.prev = prev 679 prev.next = first 680 } 681} 682 683func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) { 684 if conn.first != nil && conn.first.seq == conn.nextSeq { 685 panic("wtf") 686 } 687 p, p2, numPages := a.pagesFromTCP(t, ts) 688 prev, current := conn.traverseConn(Sequence(t.Seq)) 689 conn.pushBetween(prev, current, p, p2) 690 conn.pages += numPages 691 if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) || 692 (a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) { 693 if *debugLog { 694 log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used) 695 } 696 a.addNextFromConn(conn) 697 } 698} 699 700// pagesFromTCP creates a page (or set of pages) from a TCP packet. Note that 701// it should NEVER receive a SYN packet, as it doesn't handle sequences 702// correctly. 703// 704// It returns the first and last page in its doubly-linked list of new pages. 705func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) { 706 first := a.pc.next(ts) 707 current := first 708 numPages++ 709 seq, bytes := Sequence(t.Seq), t.Payload 710 for { 711 length := min(len(bytes), pageBytes) 712 current.Bytes = current.buf[:length] 713 copy(current.Bytes, bytes) 714 current.seq = seq 715 bytes = bytes[length:] 716 if len(bytes) == 0 { 717 break 718 } 719 seq = seq.Add(length) 720 current.next = a.pc.next(ts) 721 current.next.prev = current 722 current = current.next 723 numPages++ 724 } 725 current.End = t.RST || t.FIN 726 return first, current, numPages 727} 728 729// addNextFromConn pops the first page from a connection off and adds it to the 730// return array. 731func (a *Assembler) addNextFromConn(conn *connection) { 732 if conn.nextSeq == invalidSequence { 733 conn.first.Skip = -1 734 } else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 { 735 conn.first.Skip = int(diff) 736 } 737 conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes) 738 if *debugLog { 739 log.Printf("%v adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq) 740 } 741 a.ret = append(a.ret, conn.first.Reassembly) 742 a.pc.replace(conn.first) 743 if conn.first == conn.last { 744 conn.first = nil 745 conn.last = nil 746 } else { 747 conn.first = conn.first.next 748 conn.first.prev = nil 749 } 750 conn.pages-- 751} 752