1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"fmt"
19	"math/rand"
20	"sync/atomic"
21	"testing"
22
23	"github.com/coreos/etcd/contrib/recipes"
24)
25
26const (
27	manyQueueClients    = 3
28	queueItemsPerClient = 2
29)
30
31// TestQueueOneReaderOneWriter confirms the queue is FIFO
32func TestQueueOneReaderOneWriter(t *testing.T) {
33	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
34	defer clus.Terminate(t)
35
36	done := make(chan struct{})
37	go func() {
38		defer func() {
39			done <- struct{}{}
40		}()
41		etcdc := clus.RandClient()
42		q := recipe.NewQueue(etcdc, "testq")
43		for i := 0; i < 5; i++ {
44			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
45				t.Fatalf("error enqueuing (%v)", err)
46			}
47		}
48	}()
49
50	etcdc := clus.RandClient()
51	q := recipe.NewQueue(etcdc, "testq")
52	for i := 0; i < 5; i++ {
53		s, err := q.Dequeue()
54		if err != nil {
55			t.Fatalf("error dequeueing (%v)", err)
56		}
57		if s != fmt.Sprintf("%d", i) {
58			t.Fatalf("expected dequeue value %v, got %v", s, i)
59		}
60	}
61	<-done
62}
63
64func TestQueueManyReaderOneWriter(t *testing.T) {
65	testQueueNReaderMWriter(t, manyQueueClients, 1)
66}
67
68func TestQueueOneReaderManyWriter(t *testing.T) {
69	testQueueNReaderMWriter(t, 1, manyQueueClients)
70}
71
72func TestQueueManyReaderManyWriter(t *testing.T) {
73	testQueueNReaderMWriter(t, manyQueueClients, manyQueueClients)
74}
75
76// BenchmarkQueue benchmarks Queues using many/many readers/writers
77func BenchmarkQueue(b *testing.B) {
78	// XXX switch tests to use TB interface
79	clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
80	defer clus.Terminate(nil)
81	for i := 0; i < b.N; i++ {
82		testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
83	}
84}
85
86// TestPrQueueOneReaderOneWriter tests whether priority queues respect priorities.
87func TestPrQueueOneReaderOneWriter(t *testing.T) {
88	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
89	defer clus.Terminate(t)
90
91	// write out five items with random priority
92	etcdc := clus.RandClient()
93	q := recipe.NewPriorityQueue(etcdc, "testprq")
94	for i := 0; i < 5; i++ {
95		// [0, 2] priority for priority collision to test seq keys
96		pr := uint16(rand.Intn(3))
97		if err := q.Enqueue(fmt.Sprintf("%d", pr), pr); err != nil {
98			t.Fatalf("error enqueuing (%v)", err)
99		}
100	}
101
102	// read back items; confirm priority order is respected
103	lastPr := -1
104	for i := 0; i < 5; i++ {
105		s, err := q.Dequeue()
106		if err != nil {
107			t.Fatalf("error dequeueing (%v)", err)
108		}
109		curPr := 0
110		if _, err := fmt.Sscanf(s, "%d", &curPr); err != nil {
111			t.Fatalf(`error parsing item "%s" (%v)`, s, err)
112		}
113		if lastPr > curPr {
114			t.Fatalf("expected priority %v > %v", curPr, lastPr)
115		}
116	}
117}
118
119func TestPrQueueManyReaderManyWriter(t *testing.T) {
120	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
121	defer clus.Terminate(t)
122	rqs := newPriorityQueues(clus, manyQueueClients)
123	wqs := newPriorityQueues(clus, manyQueueClients)
124	testReadersWriters(t, rqs, wqs)
125}
126
127// BenchmarkQueue benchmarks Queues using n/n readers/writers
128func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
129	// XXX switch tests to use TB interface
130	clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
131	defer clus.Terminate(nil)
132	rqs := newPriorityQueues(clus, 1)
133	wqs := newPriorityQueues(clus, 1)
134	for i := 0; i < b.N; i++ {
135		testReadersWriters(nil, rqs, wqs)
136	}
137}
138
139func testQueueNReaderMWriter(t *testing.T, n int, m int) {
140	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
141	defer clus.Terminate(t)
142	testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
143}
144
145func newQueues(clus *ClusterV3, n int) (qs []testQueue) {
146	for i := 0; i < n; i++ {
147		etcdc := clus.RandClient()
148		qs = append(qs, recipe.NewQueue(etcdc, "q"))
149	}
150	return qs
151}
152
153func newPriorityQueues(clus *ClusterV3, n int) (qs []testQueue) {
154	for i := 0; i < n; i++ {
155		etcdc := clus.RandClient()
156		q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}
157		qs = append(qs, q)
158	}
159	return qs
160}
161
162func testReadersWriters(t *testing.T, rqs []testQueue, wqs []testQueue) {
163	rerrc := make(chan error)
164	werrc := make(chan error)
165	manyWriters(wqs, queueItemsPerClient, werrc)
166	manyReaders(rqs, len(wqs)*queueItemsPerClient, rerrc)
167	for range wqs {
168		if err := <-werrc; err != nil {
169			t.Errorf("error writing (%v)", err)
170		}
171	}
172	for range rqs {
173		if err := <-rerrc; err != nil {
174			t.Errorf("error reading (%v)", err)
175		}
176	}
177}
178
179func manyReaders(qs []testQueue, totalReads int, errc chan<- error) {
180	var rxReads int32
181	for _, q := range qs {
182		go func(q testQueue) {
183			for {
184				total := atomic.AddInt32(&rxReads, 1)
185				if int(total) > totalReads {
186					break
187				}
188				if _, err := q.Dequeue(); err != nil {
189					errc <- err
190					return
191				}
192			}
193			errc <- nil
194		}(q)
195	}
196}
197
198func manyWriters(qs []testQueue, writesEach int, errc chan<- error) {
199	for _, q := range qs {
200		go func(q testQueue) {
201			for j := 0; j < writesEach; j++ {
202				if err := q.Enqueue("foo"); err != nil {
203					errc <- err
204					return
205				}
206			}
207			errc <- nil
208		}(q)
209	}
210}
211
212type testQueue interface {
213	Enqueue(val string) error
214	Dequeue() (string, error)
215}
216
217type flatPriorityQueue struct{ *recipe.PriorityQueue }
218
219func (q *flatPriorityQueue) Enqueue(val string) error {
220	// randomized to stress dequeuing logic; order isn't important
221	return q.PriorityQueue.Enqueue(val, uint16(rand.Intn(2)))
222}
223func (q *flatPriorityQueue) Dequeue() (string, error) {
224	return q.PriorityQueue.Dequeue()
225}
226