1package quic 2 3type sender interface { 4 Send(p *packetBuffer) 5 Run() error 6 WouldBlock() bool 7 Available() <-chan struct{} 8 Close() 9} 10 11type sendQueue struct { 12 queue chan *packetBuffer 13 closeCalled chan struct{} // runStopped when Close() is called 14 runStopped chan struct{} // runStopped when the run loop returns 15 available chan struct{} 16 conn sendConn 17} 18 19var _ sender = &sendQueue{} 20 21const sendQueueCapacity = 8 22 23func newSendQueue(conn sendConn) sender { 24 return &sendQueue{ 25 conn: conn, 26 runStopped: make(chan struct{}), 27 closeCalled: make(chan struct{}), 28 available: make(chan struct{}, 1), 29 queue: make(chan *packetBuffer, sendQueueCapacity), 30 } 31} 32 33// Send sends out a packet. It's guaranteed to not block. 34// Callers need to make sure that there's actually space in the send queue by calling WouldBlock. 35// Otherwise Send will panic. 36func (h *sendQueue) Send(p *packetBuffer) { 37 select { 38 case h.queue <- p: 39 case <-h.runStopped: 40 default: 41 panic("sendQueue.Send would have blocked") 42 } 43} 44 45func (h *sendQueue) WouldBlock() bool { 46 return len(h.queue) == sendQueueCapacity 47} 48 49func (h *sendQueue) Available() <-chan struct{} { 50 return h.available 51} 52 53func (h *sendQueue) Run() error { 54 defer close(h.runStopped) 55 var shouldClose bool 56 for { 57 if shouldClose && len(h.queue) == 0 { 58 return nil 59 } 60 select { 61 case <-h.closeCalled: 62 h.closeCalled = nil // prevent this case from being selected again 63 // make sure that all queued packets are actually sent out 64 shouldClose = true 65 case p := <-h.queue: 66 if err := h.conn.Write(p.Data); err != nil { 67 return err 68 } 69 p.Release() 70 select { 71 case h.available <- struct{}{}: 72 default: 73 } 74 } 75 } 76} 77 78func (h *sendQueue) Close() { 79 close(h.closeCalled) 80 // wait until the run loop returned 81 <-h.runStopped 82} 83