1package yamux
2
3import (
4	"bufio"
5	"fmt"
6	"io"
7	"io/ioutil"
8	"log"
9	"math"
10	"net"
11	"strings"
12	"sync"
13	"sync/atomic"
14	"time"
15)
16
17// Session is used to wrap a reliable ordered connection and to
18// multiplex it into multiple streams.
19type Session struct {
20	// remoteGoAway indicates the remote side does
21	// not want futher connections. Must be first for alignment.
22	remoteGoAway int32
23
24	// localGoAway indicates that we should stop
25	// accepting futher connections. Must be first for alignment.
26	localGoAway int32
27
28	// nextStreamID is the next stream we should
29	// send. This depends if we are a client/server.
30	nextStreamID uint32
31
32	// config holds our configuration
33	config *Config
34
35	// logger is used for our logs
36	logger *log.Logger
37
38	// conn is the underlying connection
39	conn io.ReadWriteCloser
40
41	// bufRead is a buffered reader
42	bufRead *bufio.Reader
43
44	// pings is used to track inflight pings
45	pings    map[uint32]chan struct{}
46	pingID   uint32
47	pingLock sync.Mutex
48
49	// streams maps a stream id to a stream, and inflight has an entry
50	// for any outgoing stream that has not yet been established. Both are
51	// protected by streamLock.
52	streams    map[uint32]*Stream
53	inflight   map[uint32]struct{}
54	streamLock sync.Mutex
55
56	// synCh acts like a semaphore. It is sized to the AcceptBacklog which
57	// is assumed to be symmetric between the client and server. This allows
58	// the client to avoid exceeding the backlog and instead blocks the open.
59	synCh chan struct{}
60
61	// acceptCh is used to pass ready streams to the client
62	acceptCh chan *Stream
63
64	// sendCh is used to mark a stream as ready to send,
65	// or to send a header out directly.
66	sendCh chan sendReady
67
68	// recvDoneCh is closed when recv() exits to avoid a race
69	// between stream registration and stream shutdown
70	recvDoneCh chan struct{}
71
72	// shutdown is used to safely close a session
73	shutdown     bool
74	shutdownErr  error
75	shutdownCh   chan struct{}
76	shutdownLock sync.Mutex
77}
78
79// sendReady is used to either mark a stream as ready
80// or to directly send a header
81type sendReady struct {
82	Hdr  []byte
83	Body io.Reader
84	Err  chan error
85}
86
87// newSession is used to construct a new session
88func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
89	s := &Session{
90		config:     config,
91		logger:     log.New(config.LogOutput, "", log.LstdFlags),
92		conn:       conn,
93		bufRead:    bufio.NewReader(conn),
94		pings:      make(map[uint32]chan struct{}),
95		streams:    make(map[uint32]*Stream),
96		inflight:   make(map[uint32]struct{}),
97		synCh:      make(chan struct{}, config.AcceptBacklog),
98		acceptCh:   make(chan *Stream, config.AcceptBacklog),
99		sendCh:     make(chan sendReady, 64),
100		recvDoneCh: make(chan struct{}),
101		shutdownCh: make(chan struct{}),
102	}
103	if client {
104		s.nextStreamID = 1
105	} else {
106		s.nextStreamID = 2
107	}
108	go s.recv()
109	go s.send()
110	if config.EnableKeepAlive {
111		go s.keepalive()
112	}
113	return s
114}
115
116// IsClosed does a safe check to see if we have shutdown
117func (s *Session) IsClosed() bool {
118	select {
119	case <-s.shutdownCh:
120		return true
121	default:
122		return false
123	}
124}
125
126// CloseChan returns a read-only channel which is closed as
127// soon as the session is closed.
128func (s *Session) CloseChan() <-chan struct{} {
129	return s.shutdownCh
130}
131
132// NumStreams returns the number of currently open streams
133func (s *Session) NumStreams() int {
134	s.streamLock.Lock()
135	num := len(s.streams)
136	s.streamLock.Unlock()
137	return num
138}
139
140// Open is used to create a new stream as a net.Conn
141func (s *Session) Open() (net.Conn, error) {
142	conn, err := s.OpenStream()
143	if err != nil {
144		return nil, err
145	}
146	return conn, nil
147}
148
149// OpenStream is used to create a new stream
150func (s *Session) OpenStream() (*Stream, error) {
151	if s.IsClosed() {
152		return nil, ErrSessionShutdown
153	}
154	if atomic.LoadInt32(&s.remoteGoAway) == 1 {
155		return nil, ErrRemoteGoAway
156	}
157
158	// Block if we have too many inflight SYNs
159	select {
160	case s.synCh <- struct{}{}:
161	case <-s.shutdownCh:
162		return nil, ErrSessionShutdown
163	}
164
165GET_ID:
166	// Get an ID, and check for stream exhaustion
167	id := atomic.LoadUint32(&s.nextStreamID)
168	if id >= math.MaxUint32-1 {
169		return nil, ErrStreamsExhausted
170	}
171	if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
172		goto GET_ID
173	}
174
175	// Register the stream
176	stream := newStream(s, id, streamInit)
177	s.streamLock.Lock()
178	s.streams[id] = stream
179	s.inflight[id] = struct{}{}
180	s.streamLock.Unlock()
181
182	// Send the window update to create
183	if err := stream.sendWindowUpdate(); err != nil {
184		select {
185		case <-s.synCh:
186		default:
187			s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
188		}
189		return nil, err
190	}
191	return stream, nil
192}
193
194// Accept is used to block until the next available stream
195// is ready to be accepted.
196func (s *Session) Accept() (net.Conn, error) {
197	conn, err := s.AcceptStream()
198	if err != nil {
199		return nil, err
200	}
201	return conn, err
202}
203
204// AcceptStream is used to block until the next available stream
205// is ready to be accepted.
206func (s *Session) AcceptStream() (*Stream, error) {
207	select {
208	case stream := <-s.acceptCh:
209		if err := stream.sendWindowUpdate(); err != nil {
210			return nil, err
211		}
212		return stream, nil
213	case <-s.shutdownCh:
214		return nil, s.shutdownErr
215	}
216}
217
218// Close is used to close the session and all streams.
219// Attempts to send a GoAway before closing the connection.
220func (s *Session) Close() error {
221	s.shutdownLock.Lock()
222	defer s.shutdownLock.Unlock()
223
224	if s.shutdown {
225		return nil
226	}
227	s.shutdown = true
228	if s.shutdownErr == nil {
229		s.shutdownErr = ErrSessionShutdown
230	}
231	close(s.shutdownCh)
232	s.conn.Close()
233	<-s.recvDoneCh
234
235	s.streamLock.Lock()
236	defer s.streamLock.Unlock()
237	for _, stream := range s.streams {
238		stream.forceClose()
239	}
240	return nil
241}
242
243// exitErr is used to handle an error that is causing the
244// session to terminate.
245func (s *Session) exitErr(err error) {
246	s.shutdownLock.Lock()
247	if s.shutdownErr == nil {
248		s.shutdownErr = err
249	}
250	s.shutdownLock.Unlock()
251	s.Close()
252}
253
254// GoAway can be used to prevent accepting further
255// connections. It does not close the underlying conn.
256func (s *Session) GoAway() error {
257	return s.waitForSend(s.goAway(goAwayNormal), nil)
258}
259
260// goAway is used to send a goAway message
261func (s *Session) goAway(reason uint32) header {
262	atomic.SwapInt32(&s.localGoAway, 1)
263	hdr := header(make([]byte, headerSize))
264	hdr.encode(typeGoAway, 0, 0, reason)
265	return hdr
266}
267
268// Ping is used to measure the RTT response time
269func (s *Session) Ping() (time.Duration, error) {
270	// Get a channel for the ping
271	ch := make(chan struct{})
272
273	// Get a new ping id, mark as pending
274	s.pingLock.Lock()
275	id := s.pingID
276	s.pingID++
277	s.pings[id] = ch
278	s.pingLock.Unlock()
279
280	// Send the ping request
281	hdr := header(make([]byte, headerSize))
282	hdr.encode(typePing, flagSYN, 0, id)
283	if err := s.waitForSend(hdr, nil); err != nil {
284		return 0, err
285	}
286
287	// Wait for a response
288	start := time.Now()
289	select {
290	case <-ch:
291	case <-time.After(s.config.ConnectionWriteTimeout):
292		s.pingLock.Lock()
293		delete(s.pings, id) // Ignore it if a response comes later.
294		s.pingLock.Unlock()
295		return 0, ErrTimeout
296	case <-s.shutdownCh:
297		return 0, ErrSessionShutdown
298	}
299
300	// Compute the RTT
301	return time.Now().Sub(start), nil
302}
303
304// keepalive is a long running goroutine that periodically does
305// a ping to keep the connection alive.
306func (s *Session) keepalive() {
307	for {
308		select {
309		case <-time.After(s.config.KeepAliveInterval):
310			_, err := s.Ping()
311			if err != nil {
312				if err != ErrSessionShutdown {
313					s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
314					s.exitErr(ErrKeepAliveTimeout)
315				}
316				return
317			}
318		case <-s.shutdownCh:
319			return
320		}
321	}
322}
323
324// waitForSendErr waits to send a header, checking for a potential shutdown
325func (s *Session) waitForSend(hdr header, body io.Reader) error {
326	errCh := make(chan error, 1)
327	return s.waitForSendErr(hdr, body, errCh)
328}
329
330// waitForSendErr waits to send a header with optional data, checking for a
331// potential shutdown. Since there's the expectation that sends can happen
332// in a timely manner, we enforce the connection write timeout here.
333func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
334	t := timerPool.Get()
335	timer := t.(*time.Timer)
336	timer.Reset(s.config.ConnectionWriteTimeout)
337	defer func() {
338		timer.Stop()
339		select {
340		case <-timer.C:
341		default:
342		}
343		timerPool.Put(t)
344	}()
345
346	ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
347	select {
348	case s.sendCh <- ready:
349	case <-s.shutdownCh:
350		return ErrSessionShutdown
351	case <-timer.C:
352		return ErrConnectionWriteTimeout
353	}
354
355	select {
356	case err := <-errCh:
357		return err
358	case <-s.shutdownCh:
359		return ErrSessionShutdown
360	case <-timer.C:
361		return ErrConnectionWriteTimeout
362	}
363}
364
365// sendNoWait does a send without waiting. Since there's the expectation that
366// the send happens right here, we enforce the connection write timeout if we
367// can't queue the header to be sent.
368func (s *Session) sendNoWait(hdr header) error {
369	t := timerPool.Get()
370	timer := t.(*time.Timer)
371	timer.Reset(s.config.ConnectionWriteTimeout)
372	defer func() {
373		timer.Stop()
374		select {
375		case <-timer.C:
376		default:
377		}
378		timerPool.Put(t)
379	}()
380
381	select {
382	case s.sendCh <- sendReady{Hdr: hdr}:
383		return nil
384	case <-s.shutdownCh:
385		return ErrSessionShutdown
386	case <-timer.C:
387		return ErrConnectionWriteTimeout
388	}
389}
390
391// send is a long running goroutine that sends data
392func (s *Session) send() {
393	for {
394		select {
395		case ready := <-s.sendCh:
396			// Send a header if ready
397			if ready.Hdr != nil {
398				sent := 0
399				for sent < len(ready.Hdr) {
400					n, err := s.conn.Write(ready.Hdr[sent:])
401					if err != nil {
402						s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
403						asyncSendErr(ready.Err, err)
404						s.exitErr(err)
405						return
406					}
407					sent += n
408				}
409			}
410
411			// Send data from a body if given
412			if ready.Body != nil {
413				_, err := io.Copy(s.conn, ready.Body)
414				if err != nil {
415					s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
416					asyncSendErr(ready.Err, err)
417					s.exitErr(err)
418					return
419				}
420			}
421
422			// No error, successful send
423			asyncSendErr(ready.Err, nil)
424		case <-s.shutdownCh:
425			return
426		}
427	}
428}
429
430// recv is a long running goroutine that accepts new data
431func (s *Session) recv() {
432	if err := s.recvLoop(); err != nil {
433		s.exitErr(err)
434	}
435}
436
437// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
438var (
439	handlers = []func(*Session, header) error{
440		typeData:         (*Session).handleStreamMessage,
441		typeWindowUpdate: (*Session).handleStreamMessage,
442		typePing:         (*Session).handlePing,
443		typeGoAway:       (*Session).handleGoAway,
444	}
445)
446
447// recvLoop continues to receive data until a fatal error is encountered
448func (s *Session) recvLoop() error {
449	defer close(s.recvDoneCh)
450	hdr := header(make([]byte, headerSize))
451	for {
452		// Read the header
453		if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
454			if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
455				s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
456			}
457			return err
458		}
459
460		// Verify the version
461		if hdr.Version() != protoVersion {
462			s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
463			return ErrInvalidVersion
464		}
465
466		mt := hdr.MsgType()
467		if mt < typeData || mt > typeGoAway {
468			return ErrInvalidMsgType
469		}
470
471		if err := handlers[mt](s, hdr); err != nil {
472			return err
473		}
474	}
475}
476
477// handleStreamMessage handles either a data or window update frame
478func (s *Session) handleStreamMessage(hdr header) error {
479	// Check for a new stream creation
480	id := hdr.StreamID()
481	flags := hdr.Flags()
482	if flags&flagSYN == flagSYN {
483		if err := s.incomingStream(id); err != nil {
484			return err
485		}
486	}
487
488	// Get the stream
489	s.streamLock.Lock()
490	stream := s.streams[id]
491	s.streamLock.Unlock()
492
493	// If we do not have a stream, likely we sent a RST
494	if stream == nil {
495		// Drain any data on the wire
496		if hdr.MsgType() == typeData && hdr.Length() > 0 {
497			s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
498			if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil {
499				s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
500				return nil
501			}
502		} else {
503			s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
504		}
505		return nil
506	}
507
508	// Check if this is a window update
509	if hdr.MsgType() == typeWindowUpdate {
510		if err := stream.incrSendWindow(hdr, flags); err != nil {
511			if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
512				s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
513			}
514			return err
515		}
516		return nil
517	}
518
519	// Read the new data
520	if err := stream.readData(hdr, flags, s.bufRead); err != nil {
521		if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
522			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
523		}
524		return err
525	}
526	return nil
527}
528
529// handlePing is invokde for a typePing frame
530func (s *Session) handlePing(hdr header) error {
531	flags := hdr.Flags()
532	pingID := hdr.Length()
533
534	// Check if this is a query, respond back in a separate context so we
535	// don't interfere with the receiving thread blocking for the write.
536	if flags&flagSYN == flagSYN {
537		go func() {
538			hdr := header(make([]byte, headerSize))
539			hdr.encode(typePing, flagACK, 0, pingID)
540			if err := s.sendNoWait(hdr); err != nil {
541				s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
542			}
543		}()
544		return nil
545	}
546
547	// Handle a response
548	s.pingLock.Lock()
549	ch := s.pings[pingID]
550	if ch != nil {
551		delete(s.pings, pingID)
552		close(ch)
553	}
554	s.pingLock.Unlock()
555	return nil
556}
557
558// handleGoAway is invokde for a typeGoAway frame
559func (s *Session) handleGoAway(hdr header) error {
560	code := hdr.Length()
561	switch code {
562	case goAwayNormal:
563		atomic.SwapInt32(&s.remoteGoAway, 1)
564	case goAwayProtoErr:
565		s.logger.Printf("[ERR] yamux: received protocol error go away")
566		return fmt.Errorf("yamux protocol error")
567	case goAwayInternalErr:
568		s.logger.Printf("[ERR] yamux: received internal error go away")
569		return fmt.Errorf("remote yamux internal error")
570	default:
571		s.logger.Printf("[ERR] yamux: received unexpected go away")
572		return fmt.Errorf("unexpected go away received")
573	}
574	return nil
575}
576
577// incomingStream is used to create a new incoming stream
578func (s *Session) incomingStream(id uint32) error {
579	// Reject immediately if we are doing a go away
580	if atomic.LoadInt32(&s.localGoAway) == 1 {
581		hdr := header(make([]byte, headerSize))
582		hdr.encode(typeWindowUpdate, flagRST, id, 0)
583		return s.sendNoWait(hdr)
584	}
585
586	// Allocate a new stream
587	stream := newStream(s, id, streamSYNReceived)
588
589	s.streamLock.Lock()
590	defer s.streamLock.Unlock()
591
592	// Check if stream already exists
593	if _, ok := s.streams[id]; ok {
594		s.logger.Printf("[ERR] yamux: duplicate stream declared")
595		if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
596			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
597		}
598		return ErrDuplicateStream
599	}
600
601	// Register the stream
602	s.streams[id] = stream
603
604	// Check if we've exceeded the backlog
605	select {
606	case s.acceptCh <- stream:
607		return nil
608	default:
609		// Backlog exceeded! RST the stream
610		s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
611		delete(s.streams, id)
612		stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
613		return s.sendNoWait(stream.sendHdr)
614	}
615}
616
617// closeStream is used to close a stream once both sides have
618// issued a close. If there was an in-flight SYN and the stream
619// was not yet established, then this will give the credit back.
620func (s *Session) closeStream(id uint32) {
621	s.streamLock.Lock()
622	if _, ok := s.inflight[id]; ok {
623		select {
624		case <-s.synCh:
625		default:
626			s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
627		}
628	}
629	delete(s.streams, id)
630	s.streamLock.Unlock()
631}
632
633// establishStream is used to mark a stream that was in the
634// SYN Sent state as established.
635func (s *Session) establishStream(id uint32) {
636	s.streamLock.Lock()
637	if _, ok := s.inflight[id]; ok {
638		delete(s.inflight, id)
639	} else {
640		s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
641	}
642	select {
643	case <-s.synCh:
644	default:
645		s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
646	}
647	s.streamLock.Unlock()
648}
649