1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "fmt" 19 "math/rand" 20 "sync/atomic" 21 "testing" 22 23 "github.com/coreos/etcd/contrib/recipes" 24) 25 26const ( 27 manyQueueClients = 3 28 queueItemsPerClient = 2 29) 30 31// TestQueueOneReaderOneWriter confirms the queue is FIFO 32func TestQueueOneReaderOneWriter(t *testing.T) { 33 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 34 defer clus.Terminate(t) 35 36 done := make(chan struct{}) 37 go func() { 38 defer func() { 39 done <- struct{}{} 40 }() 41 etcdc := clus.RandClient() 42 q := recipe.NewQueue(etcdc, "testq") 43 for i := 0; i < 5; i++ { 44 if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil { 45 t.Fatalf("error enqueuing (%v)", err) 46 } 47 } 48 }() 49 50 etcdc := clus.RandClient() 51 q := recipe.NewQueue(etcdc, "testq") 52 for i := 0; i < 5; i++ { 53 s, err := q.Dequeue() 54 if err != nil { 55 t.Fatalf("error dequeueing (%v)", err) 56 } 57 if s != fmt.Sprintf("%d", i) { 58 t.Fatalf("expected dequeue value %v, got %v", s, i) 59 } 60 } 61 <-done 62} 63 64func TestQueueManyReaderOneWriter(t *testing.T) { 65 testQueueNReaderMWriter(t, manyQueueClients, 1) 66} 67 68func TestQueueOneReaderManyWriter(t *testing.T) { 69 testQueueNReaderMWriter(t, 1, manyQueueClients) 70} 71 72func TestQueueManyReaderManyWriter(t *testing.T) { 73 testQueueNReaderMWriter(t, manyQueueClients, manyQueueClients) 74} 75 76// BenchmarkQueue benchmarks Queues using many/many readers/writers 77func BenchmarkQueue(b *testing.B) { 78 // XXX switch tests to use TB interface 79 clus := NewClusterV3(nil, &ClusterConfig{Size: 3}) 80 defer clus.Terminate(nil) 81 for i := 0; i < b.N; i++ { 82 testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients) 83 } 84} 85 86// TestPrQueueOneReaderOneWriter tests whether priority queues respect priorities. 87func TestPrQueueOneReaderOneWriter(t *testing.T) { 88 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 89 defer clus.Terminate(t) 90 91 // write out five items with random priority 92 etcdc := clus.RandClient() 93 q := recipe.NewPriorityQueue(etcdc, "testprq") 94 for i := 0; i < 5; i++ { 95 // [0, 2] priority for priority collision to test seq keys 96 pr := uint16(rand.Intn(3)) 97 if err := q.Enqueue(fmt.Sprintf("%d", pr), pr); err != nil { 98 t.Fatalf("error enqueuing (%v)", err) 99 } 100 } 101 102 // read back items; confirm priority order is respected 103 lastPr := -1 104 for i := 0; i < 5; i++ { 105 s, err := q.Dequeue() 106 if err != nil { 107 t.Fatalf("error dequeueing (%v)", err) 108 } 109 curPr := 0 110 if _, err := fmt.Sscanf(s, "%d", &curPr); err != nil { 111 t.Fatalf(`error parsing item "%s" (%v)`, s, err) 112 } 113 if lastPr > curPr { 114 t.Fatalf("expected priority %v > %v", curPr, lastPr) 115 } 116 } 117} 118 119func TestPrQueueManyReaderManyWriter(t *testing.T) { 120 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 121 defer clus.Terminate(t) 122 rqs := newPriorityQueues(clus, manyQueueClients) 123 wqs := newPriorityQueues(clus, manyQueueClients) 124 testReadersWriters(t, rqs, wqs) 125} 126 127// BenchmarkQueue benchmarks Queues using n/n readers/writers 128func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) { 129 // XXX switch tests to use TB interface 130 clus := NewClusterV3(nil, &ClusterConfig{Size: 3}) 131 defer clus.Terminate(nil) 132 rqs := newPriorityQueues(clus, 1) 133 wqs := newPriorityQueues(clus, 1) 134 for i := 0; i < b.N; i++ { 135 testReadersWriters(nil, rqs, wqs) 136 } 137} 138 139func testQueueNReaderMWriter(t *testing.T, n int, m int) { 140 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 141 defer clus.Terminate(t) 142 testReadersWriters(t, newQueues(clus, n), newQueues(clus, m)) 143} 144 145func newQueues(clus *ClusterV3, n int) (qs []testQueue) { 146 for i := 0; i < n; i++ { 147 etcdc := clus.RandClient() 148 qs = append(qs, recipe.NewQueue(etcdc, "q")) 149 } 150 return qs 151} 152 153func newPriorityQueues(clus *ClusterV3, n int) (qs []testQueue) { 154 for i := 0; i < n; i++ { 155 etcdc := clus.RandClient() 156 q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")} 157 qs = append(qs, q) 158 } 159 return qs 160} 161 162func testReadersWriters(t *testing.T, rqs []testQueue, wqs []testQueue) { 163 rerrc := make(chan error) 164 werrc := make(chan error) 165 manyWriters(wqs, queueItemsPerClient, werrc) 166 manyReaders(rqs, len(wqs)*queueItemsPerClient, rerrc) 167 for range wqs { 168 if err := <-werrc; err != nil { 169 t.Errorf("error writing (%v)", err) 170 } 171 } 172 for range rqs { 173 if err := <-rerrc; err != nil { 174 t.Errorf("error reading (%v)", err) 175 } 176 } 177} 178 179func manyReaders(qs []testQueue, totalReads int, errc chan<- error) { 180 var rxReads int32 181 for _, q := range qs { 182 go func(q testQueue) { 183 for { 184 total := atomic.AddInt32(&rxReads, 1) 185 if int(total) > totalReads { 186 break 187 } 188 if _, err := q.Dequeue(); err != nil { 189 errc <- err 190 return 191 } 192 } 193 errc <- nil 194 }(q) 195 } 196} 197 198func manyWriters(qs []testQueue, writesEach int, errc chan<- error) { 199 for _, q := range qs { 200 go func(q testQueue) { 201 for j := 0; j < writesEach; j++ { 202 if err := q.Enqueue("foo"); err != nil { 203 errc <- err 204 return 205 } 206 } 207 errc <- nil 208 }(q) 209 } 210} 211 212type testQueue interface { 213 Enqueue(val string) error 214 Dequeue() (string, error) 215} 216 217type flatPriorityQueue struct{ *recipe.PriorityQueue } 218 219func (q *flatPriorityQueue) Enqueue(val string) error { 220 // randomized to stress dequeuing logic; order isn't important 221 return q.PriorityQueue.Enqueue(val, uint16(rand.Intn(2))) 222} 223func (q *flatPriorityQueue) Dequeue() (string, error) { 224 return q.PriorityQueue.Dequeue() 225} 226