1// Copyright 2017 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package semaphore_test 6 7import ( 8 "context" 9 "math/rand" 10 "runtime" 11 "sync" 12 "testing" 13 "time" 14 15 "golang.org/x/sync/errgroup" 16 "golang.org/x/sync/semaphore" 17) 18 19const maxSleep = 1 * time.Millisecond 20 21func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) { 22 for i := 0; i < loops; i++ { 23 sem.Acquire(context.Background(), n) 24 time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond) 25 sem.Release(n) 26 } 27} 28 29func TestWeighted(t *testing.T) { 30 t.Parallel() 31 32 n := runtime.GOMAXPROCS(0) 33 loops := 10000 / n 34 sem := semaphore.NewWeighted(int64(n)) 35 var wg sync.WaitGroup 36 wg.Add(n) 37 for i := 0; i < n; i++ { 38 i := i 39 go func() { 40 defer wg.Done() 41 HammerWeighted(sem, int64(i), loops) 42 }() 43 } 44 wg.Wait() 45} 46 47func TestWeightedPanic(t *testing.T) { 48 t.Parallel() 49 50 defer func() { 51 if recover() == nil { 52 t.Fatal("release of an unacquired weighted semaphore did not panic") 53 } 54 }() 55 w := semaphore.NewWeighted(1) 56 w.Release(1) 57} 58 59func TestWeightedTryAcquire(t *testing.T) { 60 t.Parallel() 61 62 ctx := context.Background() 63 sem := semaphore.NewWeighted(2) 64 tries := []bool{} 65 sem.Acquire(ctx, 1) 66 tries = append(tries, sem.TryAcquire(1)) 67 tries = append(tries, sem.TryAcquire(1)) 68 69 sem.Release(2) 70 71 tries = append(tries, sem.TryAcquire(1)) 72 sem.Acquire(ctx, 1) 73 tries = append(tries, sem.TryAcquire(1)) 74 75 want := []bool{true, false, true, false} 76 for i := range tries { 77 if tries[i] != want[i] { 78 t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) 79 } 80 } 81} 82 83func TestWeightedAcquire(t *testing.T) { 84 t.Parallel() 85 86 ctx := context.Background() 87 sem := semaphore.NewWeighted(2) 88 tryAcquire := func(n int64) bool { 89 ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) 90 defer cancel() 91 return sem.Acquire(ctx, n) == nil 92 } 93 94 tries := []bool{} 95 sem.Acquire(ctx, 1) 96 tries = append(tries, tryAcquire(1)) 97 tries = append(tries, tryAcquire(1)) 98 99 sem.Release(2) 100 101 tries = append(tries, tryAcquire(1)) 102 sem.Acquire(ctx, 1) 103 tries = append(tries, tryAcquire(1)) 104 105 want := []bool{true, false, true, false} 106 for i := range tries { 107 if tries[i] != want[i] { 108 t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) 109 } 110 } 111} 112 113func TestWeightedDoesntBlockIfTooBig(t *testing.T) { 114 t.Parallel() 115 116 const n = 2 117 sem := semaphore.NewWeighted(n) 118 { 119 ctx, cancel := context.WithCancel(context.Background()) 120 defer cancel() 121 go sem.Acquire(ctx, n+1) 122 } 123 124 g, ctx := errgroup.WithContext(context.Background()) 125 for i := n * 3; i > 0; i-- { 126 g.Go(func() error { 127 err := sem.Acquire(ctx, 1) 128 if err == nil { 129 time.Sleep(1 * time.Millisecond) 130 sem.Release(1) 131 } 132 return err 133 }) 134 } 135 if err := g.Wait(); err != nil { 136 t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1) 137 } 138} 139 140// TestLargeAcquireDoesntStarve times out if a large call to Acquire starves. 141// Merely returning from the test function indicates success. 142func TestLargeAcquireDoesntStarve(t *testing.T) { 143 t.Parallel() 144 145 ctx := context.Background() 146 n := int64(runtime.GOMAXPROCS(0)) 147 sem := semaphore.NewWeighted(n) 148 running := true 149 150 var wg sync.WaitGroup 151 wg.Add(int(n)) 152 for i := n; i > 0; i-- { 153 sem.Acquire(ctx, 1) 154 go func() { 155 defer func() { 156 sem.Release(1) 157 wg.Done() 158 }() 159 for running { 160 time.Sleep(1 * time.Millisecond) 161 sem.Release(1) 162 sem.Acquire(ctx, 1) 163 } 164 }() 165 } 166 167 sem.Acquire(ctx, n) 168 running = false 169 sem.Release(n) 170 wg.Wait() 171} 172 173// translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43 174func TestAllocCancelDoesntStarve(t *testing.T) { 175 sem := semaphore.NewWeighted(10) 176 177 // Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed. 178 sem.Acquire(context.Background(), 1) 179 180 // In the background, Acquire(_, 10). 181 ctx, cancel := context.WithCancel(context.Background()) 182 defer cancel() 183 go func() { 184 sem.Acquire(ctx, 10) 185 }() 186 187 // Wait until the Acquire(_, 10) call blocks. 188 for sem.TryAcquire(1) { 189 sem.Release(1) 190 runtime.Gosched() 191 } 192 193 // Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call. 194 // Both Acquire calls should unblock and return, in either order. 195 go cancel() 196 197 err := sem.Acquire(context.Background(), 1) 198 if err != nil { 199 t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err) 200 } 201 sem.Release(1) 202} 203