1package spdystream
2
3import (
4	"sync"
5	"testing"
6	"time"
7
8	"github.com/docker/spdystream/spdy"
9)
10
11func TestPriorityQueueOrdering(t *testing.T) {
12	queue := NewPriorityFrameQueue(150)
13	data1 := &spdy.DataFrame{}
14	data2 := &spdy.DataFrame{}
15	data3 := &spdy.DataFrame{}
16	data4 := &spdy.DataFrame{}
17	queue.Push(data1, 2)
18	queue.Push(data2, 1)
19	queue.Push(data3, 1)
20	queue.Push(data4, 0)
21
22	if queue.Pop() != data4 {
23		t.Fatalf("Wrong order, expected data4 first")
24	}
25	if queue.Pop() != data2 {
26		t.Fatalf("Wrong order, expected data2 second")
27	}
28	if queue.Pop() != data3 {
29		t.Fatalf("Wrong order, expected data3 third")
30	}
31	if queue.Pop() != data1 {
32		t.Fatalf("Wrong order, expected data1 fourth")
33	}
34
35	// Insert 50 Medium priority frames
36	for i := spdy.StreamId(50); i < 100; i++ {
37		queue.Push(&spdy.DataFrame{StreamId: i}, 1)
38	}
39	// Insert 50 low priority frames
40	for i := spdy.StreamId(100); i < 150; i++ {
41		queue.Push(&spdy.DataFrame{StreamId: i}, 2)
42	}
43	// Insert 50 high priority frames
44	for i := spdy.StreamId(0); i < 50; i++ {
45		queue.Push(&spdy.DataFrame{StreamId: i}, 0)
46	}
47
48	for i := spdy.StreamId(0); i < 150; i++ {
49		frame := queue.Pop()
50		if frame.(*spdy.DataFrame).StreamId != i {
51			t.Fatalf("Wrong frame\nActual: %d\nExpecting: %d", frame.(*spdy.DataFrame).StreamId, i)
52		}
53	}
54}
55
56func TestPriorityQueueSync(t *testing.T) {
57	queue := NewPriorityFrameQueue(150)
58	var wg sync.WaitGroup
59	insertRange := func(start, stop spdy.StreamId, priority uint8) {
60		for i := start; i < stop; i++ {
61			queue.Push(&spdy.DataFrame{StreamId: i}, priority)
62		}
63		wg.Done()
64	}
65	wg.Add(3)
66	go insertRange(spdy.StreamId(100), spdy.StreamId(150), 2)
67	go insertRange(spdy.StreamId(0), spdy.StreamId(50), 0)
68	go insertRange(spdy.StreamId(50), spdy.StreamId(100), 1)
69
70	wg.Wait()
71	for i := spdy.StreamId(0); i < 150; i++ {
72		frame := queue.Pop()
73		if frame.(*spdy.DataFrame).StreamId != i {
74			t.Fatalf("Wrong frame\nActual: %d\nExpecting: %d", frame.(*spdy.DataFrame).StreamId, i)
75		}
76	}
77}
78
79func TestPriorityQueueBlocking(t *testing.T) {
80	queue := NewPriorityFrameQueue(15)
81	for i := 0; i < 15; i++ {
82		queue.Push(&spdy.DataFrame{}, 2)
83	}
84	doneChan := make(chan bool)
85	go func() {
86		queue.Push(&spdy.DataFrame{}, 2)
87		close(doneChan)
88	}()
89	select {
90	case <-doneChan:
91		t.Fatalf("Push succeeded, expected to block")
92	case <-time.After(time.Millisecond):
93		break
94	}
95
96	queue.Pop()
97
98	select {
99	case <-doneChan:
100		break
101	case <-time.After(time.Millisecond):
102		t.Fatalf("Push should have succeeded, but timeout reached")
103	}
104
105	for i := 0; i < 15; i++ {
106		queue.Pop()
107	}
108}
109