1package quic
2
3import (
4	"github.com/lucas-clemente/quic-go/internal/protocol"
5	"github.com/lucas-clemente/quic-go/internal/utils"
6	"github.com/lucas-clemente/quic-go/internal/wire"
7)
8
9type datagramQueue struct {
10	sendQueue chan *wire.DatagramFrame
11	rcvQueue  chan []byte
12
13	closeErr error
14	closed   chan struct{}
15
16	hasData func()
17
18	dequeued chan struct{}
19
20	logger utils.Logger
21}
22
23func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
24	return &datagramQueue{
25		hasData:   hasData,
26		sendQueue: make(chan *wire.DatagramFrame, 1),
27		rcvQueue:  make(chan []byte, protocol.DatagramRcvQueueLen),
28		dequeued:  make(chan struct{}),
29		closed:    make(chan struct{}),
30		logger:    logger,
31	}
32}
33
34// AddAndWait queues a new DATAGRAM frame for sending.
35// It blocks until the frame has been dequeued.
36func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
37	select {
38	case h.sendQueue <- f:
39		h.hasData()
40	case <-h.closed:
41		return h.closeErr
42	}
43
44	select {
45	case <-h.dequeued:
46		return nil
47	case <-h.closed:
48		return h.closeErr
49	}
50}
51
52// Get dequeues a DATAGRAM frame for sending.
53func (h *datagramQueue) Get() *wire.DatagramFrame {
54	select {
55	case f := <-h.sendQueue:
56		h.dequeued <- struct{}{}
57		return f
58	default:
59		return nil
60	}
61}
62
63// HandleDatagramFrame handles a received DATAGRAM frame.
64func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
65	data := make([]byte, len(f.Data))
66	copy(data, f.Data)
67	select {
68	case h.rcvQueue <- data:
69	default:
70		h.logger.Debugf("Discarding DATAGRAM frame (%d bytes payload)", len(f.Data))
71	}
72}
73
74// Receive gets a received DATAGRAM frame.
75func (h *datagramQueue) Receive() ([]byte, error) {
76	select {
77	case data := <-h.rcvQueue:
78		return data, nil
79	case <-h.closed:
80		return nil, h.closeErr
81	}
82}
83
84func (h *datagramQueue) CloseWithError(e error) {
85	h.closeErr = e
86	close(h.closed)
87}
88