1package yamux 2 3import ( 4 "bufio" 5 "fmt" 6 "io" 7 "io/ioutil" 8 "log" 9 "math" 10 "net" 11 "strings" 12 "sync" 13 "sync/atomic" 14 "time" 15) 16 17// Session is used to wrap a reliable ordered connection and to 18// multiplex it into multiple streams. 19type Session struct { 20 // remoteGoAway indicates the remote side does 21 // not want futher connections. Must be first for alignment. 22 remoteGoAway int32 23 24 // localGoAway indicates that we should stop 25 // accepting futher connections. Must be first for alignment. 26 localGoAway int32 27 28 // nextStreamID is the next stream we should 29 // send. This depends if we are a client/server. 30 nextStreamID uint32 31 32 // config holds our configuration 33 config *Config 34 35 // logger is used for our logs 36 logger *log.Logger 37 38 // conn is the underlying connection 39 conn io.ReadWriteCloser 40 41 // bufRead is a buffered reader 42 bufRead *bufio.Reader 43 44 // pings is used to track inflight pings 45 pings map[uint32]chan struct{} 46 pingID uint32 47 pingLock sync.Mutex 48 49 // streams maps a stream id to a stream, and inflight has an entry 50 // for any outgoing stream that has not yet been established. Both are 51 // protected by streamLock. 52 streams map[uint32]*Stream 53 inflight map[uint32]struct{} 54 streamLock sync.Mutex 55 56 // synCh acts like a semaphore. It is sized to the AcceptBacklog which 57 // is assumed to be symmetric between the client and server. This allows 58 // the client to avoid exceeding the backlog and instead blocks the open. 59 synCh chan struct{} 60 61 // acceptCh is used to pass ready streams to the client 62 acceptCh chan *Stream 63 64 // sendCh is used to mark a stream as ready to send, 65 // or to send a header out directly. 66 sendCh chan sendReady 67 68 // recvDoneCh is closed when recv() exits to avoid a race 69 // between stream registration and stream shutdown 70 recvDoneCh chan struct{} 71 72 // shutdown is used to safely close a session 73 shutdown bool 74 shutdownErr error 75 shutdownCh chan struct{} 76 shutdownLock sync.Mutex 77} 78 79// sendReady is used to either mark a stream as ready 80// or to directly send a header 81type sendReady struct { 82 Hdr []byte 83 Body io.Reader 84 Err chan error 85} 86 87// newSession is used to construct a new session 88func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { 89 s := &Session{ 90 config: config, 91 logger: log.New(config.LogOutput, "", log.LstdFlags), 92 conn: conn, 93 bufRead: bufio.NewReader(conn), 94 pings: make(map[uint32]chan struct{}), 95 streams: make(map[uint32]*Stream), 96 inflight: make(map[uint32]struct{}), 97 synCh: make(chan struct{}, config.AcceptBacklog), 98 acceptCh: make(chan *Stream, config.AcceptBacklog), 99 sendCh: make(chan sendReady, 64), 100 recvDoneCh: make(chan struct{}), 101 shutdownCh: make(chan struct{}), 102 } 103 if client { 104 s.nextStreamID = 1 105 } else { 106 s.nextStreamID = 2 107 } 108 go s.recv() 109 go s.send() 110 if config.EnableKeepAlive { 111 go s.keepalive() 112 } 113 return s 114} 115 116// IsClosed does a safe check to see if we have shutdown 117func (s *Session) IsClosed() bool { 118 select { 119 case <-s.shutdownCh: 120 return true 121 default: 122 return false 123 } 124} 125 126// CloseChan returns a read-only channel which is closed as 127// soon as the session is closed. 128func (s *Session) CloseChan() <-chan struct{} { 129 return s.shutdownCh 130} 131 132// NumStreams returns the number of currently open streams 133func (s *Session) NumStreams() int { 134 s.streamLock.Lock() 135 num := len(s.streams) 136 s.streamLock.Unlock() 137 return num 138} 139 140// Open is used to create a new stream as a net.Conn 141func (s *Session) Open() (net.Conn, error) { 142 conn, err := s.OpenStream() 143 if err != nil { 144 return nil, err 145 } 146 return conn, nil 147} 148 149// OpenStream is used to create a new stream 150func (s *Session) OpenStream() (*Stream, error) { 151 if s.IsClosed() { 152 return nil, ErrSessionShutdown 153 } 154 if atomic.LoadInt32(&s.remoteGoAway) == 1 { 155 return nil, ErrRemoteGoAway 156 } 157 158 // Block if we have too many inflight SYNs 159 select { 160 case s.synCh <- struct{}{}: 161 case <-s.shutdownCh: 162 return nil, ErrSessionShutdown 163 } 164 165GET_ID: 166 // Get an ID, and check for stream exhaustion 167 id := atomic.LoadUint32(&s.nextStreamID) 168 if id >= math.MaxUint32-1 { 169 return nil, ErrStreamsExhausted 170 } 171 if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { 172 goto GET_ID 173 } 174 175 // Register the stream 176 stream := newStream(s, id, streamInit) 177 s.streamLock.Lock() 178 s.streams[id] = stream 179 s.inflight[id] = struct{}{} 180 s.streamLock.Unlock() 181 182 // Send the window update to create 183 if err := stream.sendWindowUpdate(); err != nil { 184 select { 185 case <-s.synCh: 186 default: 187 s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") 188 } 189 return nil, err 190 } 191 return stream, nil 192} 193 194// Accept is used to block until the next available stream 195// is ready to be accepted. 196func (s *Session) Accept() (net.Conn, error) { 197 conn, err := s.AcceptStream() 198 if err != nil { 199 return nil, err 200 } 201 return conn, err 202} 203 204// AcceptStream is used to block until the next available stream 205// is ready to be accepted. 206func (s *Session) AcceptStream() (*Stream, error) { 207 select { 208 case stream := <-s.acceptCh: 209 if err := stream.sendWindowUpdate(); err != nil { 210 return nil, err 211 } 212 return stream, nil 213 case <-s.shutdownCh: 214 return nil, s.shutdownErr 215 } 216} 217 218// Close is used to close the session and all streams. 219// Attempts to send a GoAway before closing the connection. 220func (s *Session) Close() error { 221 s.shutdownLock.Lock() 222 defer s.shutdownLock.Unlock() 223 224 if s.shutdown { 225 return nil 226 } 227 s.shutdown = true 228 if s.shutdownErr == nil { 229 s.shutdownErr = ErrSessionShutdown 230 } 231 close(s.shutdownCh) 232 s.conn.Close() 233 <-s.recvDoneCh 234 235 s.streamLock.Lock() 236 defer s.streamLock.Unlock() 237 for _, stream := range s.streams { 238 stream.forceClose() 239 } 240 return nil 241} 242 243// exitErr is used to handle an error that is causing the 244// session to terminate. 245func (s *Session) exitErr(err error) { 246 s.shutdownLock.Lock() 247 if s.shutdownErr == nil { 248 s.shutdownErr = err 249 } 250 s.shutdownLock.Unlock() 251 s.Close() 252} 253 254// GoAway can be used to prevent accepting further 255// connections. It does not close the underlying conn. 256func (s *Session) GoAway() error { 257 return s.waitForSend(s.goAway(goAwayNormal), nil) 258} 259 260// goAway is used to send a goAway message 261func (s *Session) goAway(reason uint32) header { 262 atomic.SwapInt32(&s.localGoAway, 1) 263 hdr := header(make([]byte, headerSize)) 264 hdr.encode(typeGoAway, 0, 0, reason) 265 return hdr 266} 267 268// Ping is used to measure the RTT response time 269func (s *Session) Ping() (time.Duration, error) { 270 // Get a channel for the ping 271 ch := make(chan struct{}) 272 273 // Get a new ping id, mark as pending 274 s.pingLock.Lock() 275 id := s.pingID 276 s.pingID++ 277 s.pings[id] = ch 278 s.pingLock.Unlock() 279 280 // Send the ping request 281 hdr := header(make([]byte, headerSize)) 282 hdr.encode(typePing, flagSYN, 0, id) 283 if err := s.waitForSend(hdr, nil); err != nil { 284 return 0, err 285 } 286 287 // Wait for a response 288 start := time.Now() 289 select { 290 case <-ch: 291 case <-time.After(s.config.ConnectionWriteTimeout): 292 s.pingLock.Lock() 293 delete(s.pings, id) // Ignore it if a response comes later. 294 s.pingLock.Unlock() 295 return 0, ErrTimeout 296 case <-s.shutdownCh: 297 return 0, ErrSessionShutdown 298 } 299 300 // Compute the RTT 301 return time.Now().Sub(start), nil 302} 303 304// keepalive is a long running goroutine that periodically does 305// a ping to keep the connection alive. 306func (s *Session) keepalive() { 307 for { 308 select { 309 case <-time.After(s.config.KeepAliveInterval): 310 _, err := s.Ping() 311 if err != nil { 312 if err != ErrSessionShutdown { 313 s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) 314 s.exitErr(ErrKeepAliveTimeout) 315 } 316 return 317 } 318 case <-s.shutdownCh: 319 return 320 } 321 } 322} 323 324// waitForSendErr waits to send a header, checking for a potential shutdown 325func (s *Session) waitForSend(hdr header, body io.Reader) error { 326 errCh := make(chan error, 1) 327 return s.waitForSendErr(hdr, body, errCh) 328} 329 330// waitForSendErr waits to send a header with optional data, checking for a 331// potential shutdown. Since there's the expectation that sends can happen 332// in a timely manner, we enforce the connection write timeout here. 333func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { 334 t := timerPool.Get() 335 timer := t.(*time.Timer) 336 timer.Reset(s.config.ConnectionWriteTimeout) 337 defer func() { 338 timer.Stop() 339 select { 340 case <-timer.C: 341 default: 342 } 343 timerPool.Put(t) 344 }() 345 346 ready := sendReady{Hdr: hdr, Body: body, Err: errCh} 347 select { 348 case s.sendCh <- ready: 349 case <-s.shutdownCh: 350 return ErrSessionShutdown 351 case <-timer.C: 352 return ErrConnectionWriteTimeout 353 } 354 355 select { 356 case err := <-errCh: 357 return err 358 case <-s.shutdownCh: 359 return ErrSessionShutdown 360 case <-timer.C: 361 return ErrConnectionWriteTimeout 362 } 363} 364 365// sendNoWait does a send without waiting. Since there's the expectation that 366// the send happens right here, we enforce the connection write timeout if we 367// can't queue the header to be sent. 368func (s *Session) sendNoWait(hdr header) error { 369 t := timerPool.Get() 370 timer := t.(*time.Timer) 371 timer.Reset(s.config.ConnectionWriteTimeout) 372 defer func() { 373 timer.Stop() 374 select { 375 case <-timer.C: 376 default: 377 } 378 timerPool.Put(t) 379 }() 380 381 select { 382 case s.sendCh <- sendReady{Hdr: hdr}: 383 return nil 384 case <-s.shutdownCh: 385 return ErrSessionShutdown 386 case <-timer.C: 387 return ErrConnectionWriteTimeout 388 } 389} 390 391// send is a long running goroutine that sends data 392func (s *Session) send() { 393 for { 394 select { 395 case ready := <-s.sendCh: 396 // Send a header if ready 397 if ready.Hdr != nil { 398 sent := 0 399 for sent < len(ready.Hdr) { 400 n, err := s.conn.Write(ready.Hdr[sent:]) 401 if err != nil { 402 s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) 403 asyncSendErr(ready.Err, err) 404 s.exitErr(err) 405 return 406 } 407 sent += n 408 } 409 } 410 411 // Send data from a body if given 412 if ready.Body != nil { 413 _, err := io.Copy(s.conn, ready.Body) 414 if err != nil { 415 s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) 416 asyncSendErr(ready.Err, err) 417 s.exitErr(err) 418 return 419 } 420 } 421 422 // No error, successful send 423 asyncSendErr(ready.Err, nil) 424 case <-s.shutdownCh: 425 return 426 } 427 } 428} 429 430// recv is a long running goroutine that accepts new data 431func (s *Session) recv() { 432 if err := s.recvLoop(); err != nil { 433 s.exitErr(err) 434 } 435} 436 437// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type 438var ( 439 handlers = []func(*Session, header) error{ 440 typeData: (*Session).handleStreamMessage, 441 typeWindowUpdate: (*Session).handleStreamMessage, 442 typePing: (*Session).handlePing, 443 typeGoAway: (*Session).handleGoAway, 444 } 445) 446 447// recvLoop continues to receive data until a fatal error is encountered 448func (s *Session) recvLoop() error { 449 defer close(s.recvDoneCh) 450 hdr := header(make([]byte, headerSize)) 451 for { 452 // Read the header 453 if _, err := io.ReadFull(s.bufRead, hdr); err != nil { 454 if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { 455 s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) 456 } 457 return err 458 } 459 460 // Verify the version 461 if hdr.Version() != protoVersion { 462 s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) 463 return ErrInvalidVersion 464 } 465 466 mt := hdr.MsgType() 467 if mt < typeData || mt > typeGoAway { 468 return ErrInvalidMsgType 469 } 470 471 if err := handlers[mt](s, hdr); err != nil { 472 return err 473 } 474 } 475} 476 477// handleStreamMessage handles either a data or window update frame 478func (s *Session) handleStreamMessage(hdr header) error { 479 // Check for a new stream creation 480 id := hdr.StreamID() 481 flags := hdr.Flags() 482 if flags&flagSYN == flagSYN { 483 if err := s.incomingStream(id); err != nil { 484 return err 485 } 486 } 487 488 // Get the stream 489 s.streamLock.Lock() 490 stream := s.streams[id] 491 s.streamLock.Unlock() 492 493 // If we do not have a stream, likely we sent a RST 494 if stream == nil { 495 // Drain any data on the wire 496 if hdr.MsgType() == typeData && hdr.Length() > 0 { 497 s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) 498 if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil { 499 s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) 500 return nil 501 } 502 } else { 503 s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) 504 } 505 return nil 506 } 507 508 // Check if this is a window update 509 if hdr.MsgType() == typeWindowUpdate { 510 if err := stream.incrSendWindow(hdr, flags); err != nil { 511 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { 512 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) 513 } 514 return err 515 } 516 return nil 517 } 518 519 // Read the new data 520 if err := stream.readData(hdr, flags, s.bufRead); err != nil { 521 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { 522 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) 523 } 524 return err 525 } 526 return nil 527} 528 529// handlePing is invokde for a typePing frame 530func (s *Session) handlePing(hdr header) error { 531 flags := hdr.Flags() 532 pingID := hdr.Length() 533 534 // Check if this is a query, respond back in a separate context so we 535 // don't interfere with the receiving thread blocking for the write. 536 if flags&flagSYN == flagSYN { 537 go func() { 538 hdr := header(make([]byte, headerSize)) 539 hdr.encode(typePing, flagACK, 0, pingID) 540 if err := s.sendNoWait(hdr); err != nil { 541 s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) 542 } 543 }() 544 return nil 545 } 546 547 // Handle a response 548 s.pingLock.Lock() 549 ch := s.pings[pingID] 550 if ch != nil { 551 delete(s.pings, pingID) 552 close(ch) 553 } 554 s.pingLock.Unlock() 555 return nil 556} 557 558// handleGoAway is invokde for a typeGoAway frame 559func (s *Session) handleGoAway(hdr header) error { 560 code := hdr.Length() 561 switch code { 562 case goAwayNormal: 563 atomic.SwapInt32(&s.remoteGoAway, 1) 564 case goAwayProtoErr: 565 s.logger.Printf("[ERR] yamux: received protocol error go away") 566 return fmt.Errorf("yamux protocol error") 567 case goAwayInternalErr: 568 s.logger.Printf("[ERR] yamux: received internal error go away") 569 return fmt.Errorf("remote yamux internal error") 570 default: 571 s.logger.Printf("[ERR] yamux: received unexpected go away") 572 return fmt.Errorf("unexpected go away received") 573 } 574 return nil 575} 576 577// incomingStream is used to create a new incoming stream 578func (s *Session) incomingStream(id uint32) error { 579 // Reject immediately if we are doing a go away 580 if atomic.LoadInt32(&s.localGoAway) == 1 { 581 hdr := header(make([]byte, headerSize)) 582 hdr.encode(typeWindowUpdate, flagRST, id, 0) 583 return s.sendNoWait(hdr) 584 } 585 586 // Allocate a new stream 587 stream := newStream(s, id, streamSYNReceived) 588 589 s.streamLock.Lock() 590 defer s.streamLock.Unlock() 591 592 // Check if stream already exists 593 if _, ok := s.streams[id]; ok { 594 s.logger.Printf("[ERR] yamux: duplicate stream declared") 595 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { 596 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) 597 } 598 return ErrDuplicateStream 599 } 600 601 // Register the stream 602 s.streams[id] = stream 603 604 // Check if we've exceeded the backlog 605 select { 606 case s.acceptCh <- stream: 607 return nil 608 default: 609 // Backlog exceeded! RST the stream 610 s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") 611 delete(s.streams, id) 612 stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) 613 return s.sendNoWait(stream.sendHdr) 614 } 615} 616 617// closeStream is used to close a stream once both sides have 618// issued a close. If there was an in-flight SYN and the stream 619// was not yet established, then this will give the credit back. 620func (s *Session) closeStream(id uint32) { 621 s.streamLock.Lock() 622 if _, ok := s.inflight[id]; ok { 623 select { 624 case <-s.synCh: 625 default: 626 s.logger.Printf("[ERR] yamux: SYN tracking out of sync") 627 } 628 } 629 delete(s.streams, id) 630 s.streamLock.Unlock() 631} 632 633// establishStream is used to mark a stream that was in the 634// SYN Sent state as established. 635func (s *Session) establishStream(id uint32) { 636 s.streamLock.Lock() 637 if _, ok := s.inflight[id]; ok { 638 delete(s.inflight, id) 639 } else { 640 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") 641 } 642 select { 643 case <-s.synCh: 644 default: 645 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") 646 } 647 s.streamLock.Unlock() 648} 649