1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongoreplay
8
9import (
10	"fmt"
11	"log"
12	"sync"
13	"time"
14
15	"github.com/google/gopacket"
16	"github.com/google/gopacket/layers"
17	"github.com/google/gopacket/tcpassembly"
18)
19
20var memLog = new(bool)
21var debugLog = new(bool)
22
23const invalidSequence = -1
24const uint32Max = 0xFFFFFFFF
25
26// Sequence is a TCP sequence number.  It provides a few convenience functions
27// for handling TCP wrap-around.  The sequence should always be in the range
28// [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations
29// and should never be set.
30type Sequence int64
31
32// Difference defines an ordering for comparing TCP sequences that's safe for
33// roll-overs.  It returns:
34//    > 0 : if t comes after s
35//    < 0 : if t comes before s
36//      0 : if t == s
37// The number returned is the sequence difference, so 4.Difference(8) will
38// return 4.
39//
40// It handles rollovers by considering any sequence in the first quarter of the
41// uint32 space to be after any sequence in the last quarter of that space, thus
42// wrapping the uint32 space.
43func (s Sequence) Difference(t Sequence) int {
44	if s > uint32Max-uint32Max/4 && t < uint32Max/4 {
45		t += uint32Max
46	} else if t > uint32Max-uint32Max/4 && s < uint32Max/4 {
47		s += uint32Max
48	}
49	return int(t - s)
50}
51
52// Add adds an integer to a sequence and returns the resulting sequence.
53func (s Sequence) Add(t int) Sequence {
54	return (s + Sequence(t)) & uint32Max
55}
56
57// Reassembly objects are passed by an Assembler into Streams using the
58// Reassembled call.  Callers should not need to create these structs themselves
59// except for testing.
60type Reassembly struct {
61	// Bytes is the next set of bytes in the stream.  May be empty.
62	Bytes []byte
63	// Skip is set to non-zero if bytes were skipped between this and the last
64	// Reassembly.  If this is the first packet in a connection and we didn't
65	// see the start, we have no idea how many bytes we skipped, so we set it to
66	// -1.  Otherwise, it's set to the number of bytes skipped.
67	Skip int
68	// Start is set if this set of bytes has a TCP SYN accompanying it.
69	Start bool
70	// End is set if this set of bytes has a TCP FIN or RST accompanying it.
71	End bool
72	// Seen is the timestamp this set of bytes was pulled off the wire.
73	Seen time.Time
74}
75
76const pageBytes = 1900
77
78// page is used to store TCP data we're not ready for yet (out-of-order
79// packets).  Unused pages are stored in and returned from a pageCache, which
80// avoids memory allocation.  Used pages are stored in a doubly-linked list in a
81// connection.
82type page struct {
83	tcpassembly.Reassembly
84	seq        Sequence
85	index      int
86	prev, next *page
87	buf        [pageBytes]byte
88}
89
90// pageCache is a concurrency-unsafe store of page objects we use to avoid
91// memory allocation as much as we can.  It grows but never shrinks.
92type pageCache struct {
93	free         []*page
94	pcSize       int
95	size, used   int
96	pages        [][]page
97	pageRequests int64
98}
99
100const initialAllocSize = 1024
101
102func newPageCache() *pageCache {
103	pc := &pageCache{
104		free:   make([]*page, 0, initialAllocSize),
105		pcSize: initialAllocSize,
106	}
107	pc.grow()
108	return pc
109}
110
111// grow exponentially increases the size of our page cache as much as necessary.
112func (c *pageCache) grow() {
113	pages := make([]page, c.pcSize)
114	c.pages = append(c.pages, pages)
115	c.size += c.pcSize
116	for i := range pages {
117		c.free = append(c.free, &pages[i])
118	}
119	if *memLog {
120		log.Println("PageCache: created", c.pcSize, "new pages")
121	}
122	c.pcSize *= 2
123}
124
125// next returns a clean, ready-to-use page object.
126func (c *pageCache) next(ts time.Time) (p *page) {
127	if *memLog {
128		c.pageRequests++
129		if c.pageRequests&0xFFFF == 0 {
130			log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
131		}
132	}
133	if len(c.free) == 0 {
134		c.grow()
135	}
136	i := len(c.free) - 1
137	p, c.free = c.free[i], c.free[:i]
138	p.prev = nil
139	p.next = nil
140	p.Seen = ts
141	p.Bytes = p.buf[:0]
142	c.used++
143	return p
144}
145
146// replace replaces a page into the pageCache.
147func (c *pageCache) replace(p *page) {
148	c.used--
149	c.free = append(c.free, p)
150}
151
152// Stream is implemented by the caller to handle incoming reassembled TCP data.
153// Callers create a StreamFactory, then StreamPool uses it to create a new
154// Stream for every TCP stream.
155//
156// assembly will, in order:
157//    1) Create the stream via StreamFactory.New
158//    2) Call Reassembled 0 or more times, passing in reassembled TCP data in
159//    	 order
160//    3) Call ReassemblyComplete one time, after which the stream is
161//       dereferenced by assembly.
162type Stream interface {
163	// Reassembled is called zero or more times. Assembly guarantees that the
164	// set of all Reassembly objects passed in during all calls are presented in
165	// the order they appear in the TCP stream. Reassembly objects are reused
166	// after each Reassembled call, so it's important to copy anything you need
167	// out of them (specifically out of Reassembly.Bytes) that you need to stay
168	// around after you return from the Reassembled call.
169	Reassembled([]Reassembly)
170	// ReassemblyComplete is called when assembly decides there is no more data
171	// for this Stream, either because a FIN or RST packet was seen, or because
172	// the stream has timed out without any new packet data (due to a call to
173	// FlushOlderThan).
174	ReassemblyComplete()
175}
176
177// StreamFactory is used by assembly to create a new stream for each new TCP
178// session.
179type StreamFactory interface {
180	// New should return a new stream for the given TCP key.
181	New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream
182}
183
184func (p *StreamPool) connections() []*connection {
185	p.mu.RLock()
186	conns := make([]*connection, 0, len(p.conns))
187	for _, conn := range p.conns {
188		conns = append(conns, conn)
189	}
190	p.mu.RUnlock()
191	return conns
192}
193
194// FlushOlderThan finds any streams waiting for packets older than the given
195// time, and pushes through the data they have (IE: tells them to stop waiting
196// and skip the data they're waiting for).
197//
198// Each Stream maintains a list of zero or more sets of bytes it has received
199// out-of-order.  For example, if it has processed up through sequence number
200// 10, it might have bytes [15-20), [20-25), [30,50) in its list.  Each set of
201// bytes also has the timestamp it was originally viewed.  A flush call will
202// look at the smallest subsequent set of bytes, in this case [15-20), and if
203// its timestamp is older than the passed-in time, it will push it and all
204// contiguous byte-sets out to the Stream's Reassembled function.  In this case,
205// it will push [15-20), but also [20-25), since that's contiguous.  It will
206// only push [30-50) if its timestamp is also older than the passed-in time,
207// otherwise it will wait until the next FlushOlderThan to see if bytes [25-30)
208// come in.
209//
210// If it pushes all bytes (or there were no sets of bytes to begin with) AND the
211// connection has not received any bytes since the passed-in time, the
212// connection will be closed.
213//
214// Returns the number of connections flushed, and of those, the number closed
215// because of the flush.
216func (a *Assembler) FlushOlderThan(t time.Time) (flushed, closed int) {
217	conns := a.connPool.connections()
218	closes := 0
219	flushes := 0
220	for _, conn := range conns {
221		flushed := false
222		conn.mu.Lock()
223		if conn.closed {
224			// Already closed connection, nothing to do here.
225			conn.mu.Unlock()
226			continue
227		}
228		for conn.first != nil && conn.first.Seen.Before(t) {
229			a.skipFlush(conn)
230			flushed = true
231			if conn.closed {
232				closes++
233				break
234			}
235		}
236		if !conn.closed && conn.first == nil && conn.lastSeen.Before(t) {
237			flushed = true
238			a.closeConnection(conn)
239			closes++
240		}
241		if flushed {
242			flushes++
243		}
244		conn.mu.Unlock()
245	}
246	return flushes, closes
247}
248
249// FlushAll flushes all remaining data into all remaining connections, closing
250// those connections.  It returns the total number of connections flushed/closed
251// by the call.
252func (a *Assembler) FlushAll() (closed int) {
253	conns := a.connPool.connections()
254	closed = len(conns)
255	for _, conn := range conns {
256		conn.mu.Lock()
257		for !conn.closed {
258			a.skipFlush(conn)
259		}
260		conn.mu.Unlock()
261	}
262	return
263}
264
265type key [2]gopacket.Flow
266
267func (k *key) String() string {
268	return fmt.Sprintf("%s:%s", k[0], k[1])
269}
270
271// StreamPool stores all streams created by Assemblers, allowing multiple
272// assemblers to work together on stream processing while enforcing the fact
273// that a single stream receives its data serially. It is safe for concurrency,
274// usable by multiple Assemblers at once.
275//
276// StreamPool handles the creation and storage of Stream objects used by one or
277// more Assembler objects.  When a new TCP stream is found by an Assembler, it
278// creates an associated Stream by calling its StreamFactory's New method.
279// Thereafter (until the stream is closed), that Stream object will receive
280// assembled TCP data via Assembler's calls to the stream's Reassembled
281// function.
282//
283// Like the Assembler, StreamPool attempts to minimize allocation.  Unlike the
284// Assembler, though, it does have to do some locking to make sure that the
285// connection objects it stores are accessible to multiple Assemblers.
286type StreamPool struct {
287	conns              map[key]*connection
288	users              int
289	mu                 sync.RWMutex
290	factory            StreamFactory
291	free               []*connection
292	all                [][]connection
293	nextAlloc          int
294	newConnectionCount int64
295}
296
297func (p *StreamPool) grow() {
298	conns := make([]connection, p.nextAlloc)
299	p.all = append(p.all, conns)
300	for i := range conns {
301		p.free = append(p.free, &conns[i])
302	}
303	if *memLog {
304		log.Println("StreamPool: created", p.nextAlloc, "new connections")
305	}
306	p.nextAlloc *= 2
307}
308
309// NewStreamPool creates a new connection pool. Streams will
310// be created as necessary using the passed-in StreamFactory.
311func NewStreamPool(factory StreamFactory) *StreamPool {
312	return &StreamPool{
313		conns:     make(map[key]*connection, initialAllocSize),
314		free:      make([]*connection, 0, initialAllocSize),
315		factory:   factory,
316		nextAlloc: initialAllocSize,
317	}
318}
319
320const assemblerReturnValueInitialSize = 16
321
322// NewAssembler creates a new assembler. Pass in the StreamPool
323// to use, may be shared across assemblers.
324//
325// This sets some sane defaults for the assembler options,
326// see DefaultAssemblerOptions for details.
327func NewAssembler(pool *StreamPool) *Assembler {
328	pool.mu.Lock()
329	pool.users++
330	pool.mu.Unlock()
331	return &Assembler{
332		ret:              make([]tcpassembly.Reassembly, assemblerReturnValueInitialSize),
333		pc:               newPageCache(),
334		connPool:         pool,
335		AssemblerOptions: DefaultAssemblerOptions,
336	}
337}
338
339// DefaultAssemblerOptions provides default options for an assembler.
340// These options are used by default when calling NewAssembler, so if
341// modified before a NewAssembler call they'll affect the resulting Assembler.
342//
343// Note that the default options can result in ever-increasing memory usage
344// unless one of the Flush* methods is called on a regular basis.
345var DefaultAssemblerOptions = AssemblerOptions{
346	MaxBufferedPagesPerConnection: 0, // unlimited
347	MaxBufferedPagesTotal:         0, // unlimited
348}
349
350type connection struct {
351	key               key
352	pages             int
353	first, last       *page
354	nextSeq           Sequence
355	created, lastSeen time.Time
356	stream            tcpassembly.Stream
357	closed            bool
358	mu                sync.Mutex
359}
360
361func (conn *connection) reset(k key, s tcpassembly.Stream, ts time.Time) {
362	conn.key = k
363	conn.pages = 0
364	conn.first, conn.last = nil, nil
365	conn.nextSeq = invalidSequence
366	conn.created = ts
367	conn.stream = s
368	conn.closed = false
369}
370
371// AssemblerOptions controls the behavior of each assembler.  Modify the
372// options of each assembler you create to change their behavior.
373type AssemblerOptions struct {
374	// MaxBufferedPagesTotal is an upper limit on the total number of pages to
375	// buffer while waiting for out-of-order packets.  Once this limit is
376	// reached, the assembler will degrade to flushing every connection it gets
377	// a packet for.  If <= 0, this is ignored.
378	MaxBufferedPagesTotal int
379	// MaxBufferedPagesPerConnection is an upper limit on the number of pages
380	// buffered for a single connection.  Should this limit be reached for a
381	// particular connection, the smallest sequence number will be flushed,
382	// along with any contiguous data.  If <= 0, this is ignored.
383	MaxBufferedPagesPerConnection int
384}
385
386// Assembler handles reassembling TCP streams.  It is not safe for
387// concurrency... after passing a packet in via the Assemble call, the caller
388// must wait for that call to return before calling Assemble again.  Callers can
389// get around this by creating multiple assemblers that share a StreamPool.  In
390// that case, each individual stream will still be handled serially (each stream
391// has an individual mutex associated with it), however multiple assemblers can
392// assemble different connections concurrently.
393//
394// The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing
395// applications written in Go.  The Assembler uses the following methods to be
396// as fast as possible, to keep packet processing speedy:
397//
398// Avoids Lock Contention
399//
400// Assemblers locks connections, but each connection has an individual lock, and
401// rarely will two Assemblers be looking at the same connection.  Assemblers
402// lock the StreamPool when looking up connections, but they use Reader locks
403// initially, and only force a write lock if they need to create a new
404// connection or close one down.  These happen much less frequently than
405// individual packet handling.
406//
407// Each assembler runs in its own goroutine, and the only state shared between
408// goroutines is through the StreamPool.  Thus all internal Assembler state can
409// be handled without any locking.
410//
411// NOTE:  If you can guarantee that packets going to a set of Assemblers will
412// contain information on different connections per Assembler (for example,
413// they're already hashed by PF_RING hashing or some other hashing mechanism),
414// then we recommend you use a separate StreamPool per Assembler, thus avoiding
415// all lock contention.  Only when different Assemblers could receive packets
416// for the same Stream should a StreamPool be shared between them.
417//
418// Avoids Memory Copying
419//
420// In the common case, handling of a single TCP packet should result in zero
421// memory allocations.  The Assembler will look up the connection, figure out
422// that the packet has arrived in order, and immediately pass that packet on to
423// the appropriate connection's handling code.  Only if a packet arrives out of
424// order is its contents copied and stored in memory for later.
425//
426// Avoids Memory Allocation
427//
428// Assemblers try very hard to not use memory allocation unless absolutely
429// necessary.  Packet data for sequential packets is passed directly to streams
430// with no copying or allocation.  Packet data for out-of-order packets is
431// copied into reusable pages, and new pages are only allocated rarely when the
432// page cache runs out.  Page caches are Assembler-specific, thus not used
433// concurrently and requiring no locking.
434//
435// Internal representations for connection objects are also reused over time.
436// Because of this, the most common memory allocation done by the Assembler is
437// generally what's done by the caller in StreamFactory.New.  If no allocation
438// is done there, then very little allocation is done ever, mostly to handle
439// large increases in bandwidth or numbers of connections.
440//
441// TODO:  The page caches used by an Assembler will grow to the size necessary
442// to handle a workload, and currently will never shrink.  This means that
443// traffic spikes can result in large memory usage which isn't garbage collected
444// when typical traffic levels return.
445type Assembler struct {
446	AssemblerOptions
447	ret      []tcpassembly.Reassembly
448	pc       *pageCache
449	connPool *StreamPool
450}
451
452func (p *StreamPool) newConnection(k key, s tcpassembly.Stream, ts time.Time) (c *connection) {
453	if *memLog {
454		p.newConnectionCount++
455		if p.newConnectionCount&0x7FFF == 0 {
456			log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
457		}
458	}
459	if len(p.free) == 0 {
460		p.grow()
461	}
462	index := len(p.free) - 1
463	c, p.free = p.free[index], p.free[:index]
464	c.reset(k, s, ts)
465	return c
466}
467
468// getConnection returns a connection.  If end is true and a connection
469// does not already exist, returns nil.  This allows us to check for a
470// connection without actually creating one if it doesn't already exist.
471func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
472	p.mu.RLock()
473	conn := p.conns[k]
474	p.mu.RUnlock()
475	if end || conn != nil {
476		return conn
477	}
478	s := p.factory.New(k[0], k[1])
479	p.mu.Lock()
480	conn = p.newConnection(k, s, ts)
481	if conn2 := p.conns[k]; conn2 != nil {
482		p.mu.Unlock()
483		return conn2
484	}
485	p.conns[k] = conn
486	p.mu.Unlock()
487	return conn
488}
489
490// Assemble calls AssembleWithTimestamp with the current timestamp, useful for
491// packets being read directly off the wire.
492func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP) {
493	a.AssembleWithTimestamp(netFlow, t, time.Now())
494}
495
496// AssembleWithTimestamp reassembles the given TCP packet into its appropriate
497// stream.
498//
499// The timestamp passed in must be the timestamp the packet was seen. For
500// packets read off the wire, time.Now() should be fine. For packets read from
501// PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will
502// affect which streams are flushed by a call to FlushOlderThan.
503//
504// Each Assemble call results in, in order:
505//
506//    zero or one calls to StreamFactory.New, creating a stream
507//    zero or one calls to Reassembled on a single stream
508//    zero or one calls to ReassemblyComplete on the same stream
509func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
510	// Ignore empty TCP packets
511	if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
512		return
513	}
514
515	a.ret = a.ret[:0]
516	key := key{netFlow, t.TransportFlow()}
517	var conn *connection
518	// This for loop handles a race condition where a connection will close,
519	// lock the connection pool, and remove itself, but before it locked the
520	// connection pool it's returned to another Assemble statement.  This should
521	// loop 0-1 times for the VAST majority of cases.
522	for {
523		conn = a.connPool.getConnection(
524			key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
525		if conn == nil {
526			if *debugLog {
527				log.Printf("%v got empty packet on otherwise empty connection", key)
528			}
529			return
530		}
531		conn.mu.Lock()
532		if !conn.closed {
533			break
534		}
535		conn.mu.Unlock()
536	}
537	if conn.lastSeen.Before(timestamp) {
538		conn.lastSeen = timestamp
539	}
540	seq, bytes := Sequence(t.Seq), t.Payload
541
542	if conn.nextSeq == invalidSequence {
543		// Handling the first packet we've seen on the stream.
544		skip := 0
545		if !t.SYN {
546			// don't add 1 since we're just going to assume the sequence number
547			// without the SYN packet.
548			// stream was picked up somewhere in the middle, so indicate that we
549			// don't know how many packets came before it.
550			conn.nextSeq = seq.Add(len(bytes))
551			skip = -1
552		} else {
553			// for SYN packets, also increment the sequence number by 1
554			conn.nextSeq = seq.Add(len(bytes) + 1)
555		}
556		a.ret = append(a.ret, tcpassembly.Reassembly{
557			Bytes: bytes,
558			Skip:  skip,
559			Start: t.SYN,
560			Seen:  timestamp,
561		})
562		a.insertIntoConn(t, conn, timestamp)
563	} else if diff := conn.nextSeq.Difference(seq); diff > 0 {
564		a.insertIntoConn(t, conn, timestamp)
565	} else {
566		bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
567		a.ret = append(a.ret, tcpassembly.Reassembly{
568			Bytes: bytes,
569			Skip:  0,
570			End:   t.RST || t.FIN,
571			Seen:  timestamp,
572		})
573	}
574	if len(a.ret) > 0 {
575		a.sendToConnection(conn)
576	}
577	conn.mu.Unlock()
578}
579
580func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
581	if expected == invalidSequence {
582		return bytes, received.Add(len(bytes))
583	}
584	span := int(received.Difference(expected))
585	if span <= 0 {
586		return bytes, received.Add(len(bytes))
587	} else if len(bytes) < span {
588		return nil, expected
589	}
590	return bytes[span:], expected.Add(len(bytes) - span)
591}
592
593// sendToConnection sends the current values in a.ret to the connection, closing
594// the connection if the last thing sent had End set.
595func (a *Assembler) sendToConnection(conn *connection) {
596	a.addContiguous(conn)
597	if conn.stream == nil {
598		panic("why?")
599	}
600	conn.stream.Reassembled(a.ret)
601	if a.ret[len(a.ret)-1].End {
602		a.closeConnection(conn)
603	}
604}
605
606// addContiguous adds contiguous byte-sets to a connection.
607func (a *Assembler) addContiguous(conn *connection) {
608	for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
609		a.addNextFromConn(conn)
610	}
611}
612
613// skipFlush skips the first set of bytes we're waiting for and returns the
614// first set of bytes we have.  If we have no bytes pending, it closes the
615// connection.
616func (a *Assembler) skipFlush(conn *connection) {
617	if *debugLog {
618		log.Printf("%v skipFlush %v", conn.key, conn.nextSeq)
619	}
620	if conn.first == nil {
621		a.closeConnection(conn)
622		return
623	}
624	a.ret = a.ret[:0]
625	a.addNextFromConn(conn)
626	a.addContiguous(conn)
627	a.sendToConnection(conn)
628}
629
630func (p *StreamPool) remove(conn *connection) {
631	p.mu.Lock()
632	delete(p.conns, conn.key)
633	p.free = append(p.free, conn)
634	p.mu.Unlock()
635}
636
637func (a *Assembler) closeConnection(conn *connection) {
638	if *debugLog {
639		log.Printf("%v closing", conn.key)
640	}
641	conn.stream.ReassemblyComplete()
642	conn.closed = true
643	a.connPool.remove(conn)
644	for p := conn.first; p != nil; p = p.next {
645		a.pc.replace(p)
646	}
647}
648
649// traverseConn traverses our doubly-linked list of pages for the correct
650// position to put the given sequence number.  Note that it traverses backwards,
651// starting at the highest sequence number and going down, since we assume the
652// common case is that TCP packets for a stream will appear in-order, with
653// minimal loss or packet reordering.
654func (conn *connection) traverseConn(seq Sequence) (prev, current *page) {
655	prev = conn.last
656	for prev != nil && prev.seq.Difference(seq) < 0 {
657		current = prev
658		prev = current.prev
659	}
660	return
661}
662
663// pushBetween inserts the doubly-linked list first-...-last in between the
664// nodes prev-next in another doubly-linked list.  If prev is nil, makes first
665// the new first page in the connection's list.  If next is nil, makes last the
666// new last page in the list.  first/last may point to the same page.
667func (conn *connection) pushBetween(prev, next, first, last *page) {
668	// Maintain our doubly linked list
669	if next == nil || conn.last == nil {
670		conn.last = last
671	} else {
672		last.next = next
673		next.prev = last
674	}
675	if prev == nil || conn.first == nil {
676		conn.first = first
677	} else {
678		first.prev = prev
679		prev.next = first
680	}
681}
682
683func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
684	if conn.first != nil && conn.first.seq == conn.nextSeq {
685		panic("wtf")
686	}
687	p, p2, numPages := a.pagesFromTCP(t, ts)
688	prev, current := conn.traverseConn(Sequence(t.Seq))
689	conn.pushBetween(prev, current, p, p2)
690	conn.pages += numPages
691	if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
692		(a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
693		if *debugLog {
694			log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
695		}
696		a.addNextFromConn(conn)
697	}
698}
699
700// pagesFromTCP creates a page (or set of pages) from a TCP packet.  Note that
701// it should NEVER receive a SYN packet, as it doesn't handle sequences
702// correctly.
703//
704// It returns the first and last page in its doubly-linked list of new pages.
705func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
706	first := a.pc.next(ts)
707	current := first
708	numPages++
709	seq, bytes := Sequence(t.Seq), t.Payload
710	for {
711		length := min(len(bytes), pageBytes)
712		current.Bytes = current.buf[:length]
713		copy(current.Bytes, bytes)
714		current.seq = seq
715		bytes = bytes[length:]
716		if len(bytes) == 0 {
717			break
718		}
719		seq = seq.Add(length)
720		current.next = a.pc.next(ts)
721		current.next.prev = current
722		current = current.next
723		numPages++
724	}
725	current.End = t.RST || t.FIN
726	return first, current, numPages
727}
728
729// addNextFromConn pops the first page from a connection off and adds it to the
730// return array.
731func (a *Assembler) addNextFromConn(conn *connection) {
732	if conn.nextSeq == invalidSequence {
733		conn.first.Skip = -1
734	} else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
735		conn.first.Skip = int(diff)
736	}
737	conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
738	if *debugLog {
739		log.Printf("%v   adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
740	}
741	a.ret = append(a.ret, conn.first.Reassembly)
742	a.pc.replace(conn.first)
743	if conn.first == conn.last {
744		conn.first = nil
745		conn.last = nil
746	} else {
747		conn.first = conn.first.next
748		conn.first.prev = nil
749	}
750	conn.pages--
751}
752