1package quic 2 3import ( 4 "github.com/lucas-clemente/quic-go/internal/protocol" 5 "github.com/lucas-clemente/quic-go/internal/utils" 6 "github.com/lucas-clemente/quic-go/internal/wire" 7) 8 9type datagramQueue struct { 10 sendQueue chan *wire.DatagramFrame 11 rcvQueue chan []byte 12 13 closeErr error 14 closed chan struct{} 15 16 hasData func() 17 18 dequeued chan struct{} 19 20 logger utils.Logger 21} 22 23func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { 24 return &datagramQueue{ 25 hasData: hasData, 26 sendQueue: make(chan *wire.DatagramFrame, 1), 27 rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), 28 dequeued: make(chan struct{}), 29 closed: make(chan struct{}), 30 logger: logger, 31 } 32} 33 34// AddAndWait queues a new DATAGRAM frame for sending. 35// It blocks until the frame has been dequeued. 36func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { 37 select { 38 case h.sendQueue <- f: 39 h.hasData() 40 case <-h.closed: 41 return h.closeErr 42 } 43 44 select { 45 case <-h.dequeued: 46 return nil 47 case <-h.closed: 48 return h.closeErr 49 } 50} 51 52// Get dequeues a DATAGRAM frame for sending. 53func (h *datagramQueue) Get() *wire.DatagramFrame { 54 select { 55 case f := <-h.sendQueue: 56 h.dequeued <- struct{}{} 57 return f 58 default: 59 return nil 60 } 61} 62 63// HandleDatagramFrame handles a received DATAGRAM frame. 64func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) { 65 data := make([]byte, len(f.Data)) 66 copy(data, f.Data) 67 select { 68 case h.rcvQueue <- data: 69 default: 70 h.logger.Debugf("Discarding DATAGRAM frame (%d bytes payload)", len(f.Data)) 71 } 72} 73 74// Receive gets a received DATAGRAM frame. 75func (h *datagramQueue) Receive() ([]byte, error) { 76 select { 77 case data := <-h.rcvQueue: 78 return data, nil 79 case <-h.closed: 80 return nil, h.closeErr 81 } 82} 83 84func (h *datagramQueue) CloseWithError(e error) { 85 h.closeErr = e 86 close(h.closed) 87} 88