1// run -gcflags=-G=3 2 3// Copyright 2021 The Go Authors. All rights reserved. 4// Use of this source code is governed by a BSD-style 5// license that can be found in the LICENSE file. 6 7// Package chans provides utility functions for working with channels. 8package main 9 10import ( 11 "context" 12 "fmt" 13 "runtime" 14 "sort" 15 "sync" 16 "time" 17) 18 19// _Equal reports whether two slices are equal: the same length and all 20// elements equal. All floating point NaNs are considered equal. 21func _SliceEqual[Elem comparable](s1, s2 []Elem) bool { 22 if len(s1) != len(s2) { 23 return false 24 } 25 for i, v1 := range s1 { 26 v2 := s2[i] 27 if v1 != v2 { 28 isNaN := func(f Elem) bool { return f != f } 29 if !isNaN(v1) || !isNaN(v2) { 30 return false 31 } 32 } 33 } 34 return true 35} 36 37// _ReadAll reads from c until the channel is closed or the context is 38// canceled, returning all the values read. 39func _ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem { 40 var r []Elem 41 for { 42 select { 43 case <-ctx.Done(): 44 return r 45 case v, ok := <-c: 46 if !ok { 47 return r 48 } 49 r = append(r, v) 50 } 51 } 52} 53 54// _Merge merges two channels into a single channel. 55// This will leave a goroutine running until either both channels are closed 56// or the context is canceled, at which point the returned channel is closed. 57func _Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem { 58 r := make(chan Elem) 59 go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) { 60 defer close(r) 61 for c1 != nil || c2 != nil { 62 select { 63 case <-ctx.Done(): 64 return 65 case v1, ok := <-c1: 66 if ok { 67 r <- v1 68 } else { 69 c1 = nil 70 } 71 case v2, ok := <-c2: 72 if ok { 73 r <- v2 74 } else { 75 c2 = nil 76 } 77 } 78 } 79 }(ctx, c1, c2, r) 80 return r 81} 82 83// _Filter calls f on each value read from c. If f returns true the value 84// is sent on the returned channel. This will leave a goroutine running 85// until c is closed or the context is canceled, at which point the 86// returned channel is closed. 87func _Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem { 88 r := make(chan Elem) 89 go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) { 90 defer close(r) 91 for { 92 select { 93 case <-ctx.Done(): 94 return 95 case v, ok := <-c: 96 if !ok { 97 return 98 } 99 if f(v) { 100 r <- v 101 } 102 } 103 } 104 }(ctx, c, f, r) 105 return r 106} 107 108// _Sink returns a channel that discards all values sent to it. 109// This will leave a goroutine running until the context is canceled 110// or the returned channel is closed. 111func _Sink[Elem any](ctx context.Context) chan<- Elem { 112 r := make(chan Elem) 113 go func(ctx context.Context, r <-chan Elem) { 114 for { 115 select { 116 case <-ctx.Done(): 117 return 118 case _, ok := <-r: 119 if !ok { 120 return 121 } 122 } 123 } 124 }(ctx, r) 125 return r 126} 127 128// An Exclusive is a value that may only be used by a single goroutine 129// at a time. This is implemented using channels rather than a mutex. 130type _Exclusive[Val any] struct { 131 c chan Val 132} 133 134// _MakeExclusive makes an initialized exclusive value. 135func _MakeExclusive[Val any](initial Val) *_Exclusive[Val] { 136 r := &_Exclusive[Val]{ 137 c: make(chan Val, 1), 138 } 139 r.c <- initial 140 return r 141} 142 143// _Acquire acquires the exclusive value for private use. 144// It must be released using the Release method. 145func (e *_Exclusive[Val]) Acquire() Val { 146 return <-e.c 147} 148 149// TryAcquire attempts to acquire the value. The ok result reports whether 150// the value was acquired. If the value is acquired, it must be released 151// using the Release method. 152func (e *_Exclusive[Val]) TryAcquire() (v Val, ok bool) { 153 select { 154 case r := <-e.c: 155 return r, true 156 default: 157 return v, false 158 } 159} 160 161// Release updates and releases the value. 162// This method panics if the value has not been acquired. 163func (e *_Exclusive[Val]) Release(v Val) { 164 select { 165 case e.c <- v: 166 default: 167 panic("_Exclusive Release without Acquire") 168 } 169} 170 171// Ranger returns a Sender and a Receiver. The Receiver provides a 172// Next method to retrieve values. The Sender provides a Send method 173// to send values and a Close method to stop sending values. The Next 174// method indicates when the Sender has been closed, and the Send 175// method indicates when the Receiver has been freed. 176// 177// This is a convenient way to exit a goroutine sending values when 178// the receiver stops reading them. 179func _Ranger[Elem any]() (*_Sender[Elem], *_Receiver[Elem]) { 180 c := make(chan Elem) 181 d := make(chan struct{}) 182 s := &_Sender[Elem]{ 183 values: c, 184 done: d, 185 } 186 r := &_Receiver[Elem]{ 187 values: c, 188 done: d, 189 } 190 runtime.SetFinalizer(r, (*_Receiver[Elem]).finalize) 191 return s, r 192} 193 194// A _Sender is used to send values to a Receiver. 195type _Sender[Elem any] struct { 196 values chan<- Elem 197 done <-chan struct{} 198} 199 200// Send sends a value to the receiver. It reports whether the value was sent. 201// The value will not be sent if the context is closed or the receiver 202// is freed. 203func (s *_Sender[Elem]) Send(ctx context.Context, v Elem) bool { 204 select { 205 case <-ctx.Done(): 206 return false 207 case s.values <- v: 208 return true 209 case <-s.done: 210 return false 211 } 212} 213 214// Close tells the receiver that no more values will arrive. 215// After Close is called, the _Sender may no longer be used. 216func (s *_Sender[Elem]) Close() { 217 close(s.values) 218} 219 220// A _Receiver receives values from a _Sender. 221type _Receiver[Elem any] struct { 222 values <-chan Elem 223 done chan<- struct{} 224} 225 226// Next returns the next value from the channel. The bool result indicates 227// whether the value is valid. 228func (r *_Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) { 229 select { 230 case <-ctx.Done(): 231 case v, ok = <-r.values: 232 } 233 return v, ok 234} 235 236// finalize is a finalizer for the receiver. 237func (r *_Receiver[Elem]) finalize() { 238 close(r.done) 239} 240 241func TestReadAll() { 242 c := make(chan int) 243 go func() { 244 c <- 4 245 c <- 2 246 c <- 5 247 close(c) 248 }() 249 got := _ReadAll(context.Background(), c) 250 want := []int{4, 2, 5} 251 if !_SliceEqual(got, want) { 252 panic(fmt.Sprintf("_ReadAll returned %v, want %v", got, want)) 253 } 254} 255 256func TestMerge() { 257 c1 := make(chan int) 258 c2 := make(chan int) 259 go func() { 260 c1 <- 1 261 c1 <- 3 262 c1 <- 5 263 close(c1) 264 }() 265 go func() { 266 c2 <- 2 267 c2 <- 4 268 c2 <- 6 269 close(c2) 270 }() 271 ctx := context.Background() 272 got := _ReadAll(ctx, _Merge(ctx, c1, c2)) 273 sort.Ints(got) 274 want := []int{1, 2, 3, 4, 5, 6} 275 if !_SliceEqual(got, want) { 276 panic(fmt.Sprintf("_Merge returned %v, want %v", got, want)) 277 } 278} 279 280func TestFilter() { 281 c := make(chan int) 282 go func() { 283 c <- 1 284 c <- 2 285 c <- 3 286 close(c) 287 }() 288 even := func(i int) bool { return i%2 == 0 } 289 ctx := context.Background() 290 got := _ReadAll(ctx, _Filter(ctx, c, even)) 291 want := []int{2} 292 if !_SliceEqual(got, want) { 293 panic(fmt.Sprintf("_Filter returned %v, want %v", got, want)) 294 } 295} 296 297func TestSink() { 298 c := _Sink[int](context.Background()) 299 after := time.NewTimer(time.Minute) 300 defer after.Stop() 301 send := func(v int) { 302 select { 303 case c <- v: 304 case <-after.C: 305 panic("timed out sending to _Sink") 306 } 307 } 308 send(1) 309 send(2) 310 send(3) 311 close(c) 312} 313 314func TestExclusive() { 315 val := 0 316 ex := _MakeExclusive(&val) 317 318 var wg sync.WaitGroup 319 f := func() { 320 defer wg.Done() 321 for i := 0; i < 10; i++ { 322 p := ex.Acquire() 323 (*p)++ 324 ex.Release(p) 325 } 326 } 327 328 wg.Add(2) 329 go f() 330 go f() 331 332 wg.Wait() 333 if val != 20 { 334 panic(fmt.Sprintf("after Acquire/Release loop got %d, want 20", val)) 335 } 336} 337 338func TestExclusiveTry() { 339 s := "" 340 ex := _MakeExclusive(&s) 341 p, ok := ex.TryAcquire() 342 if !ok { 343 panic("TryAcquire failed") 344 } 345 *p = "a" 346 347 var wg sync.WaitGroup 348 wg.Add(1) 349 go func() { 350 defer wg.Done() 351 _, ok := ex.TryAcquire() 352 if ok { 353 panic(fmt.Sprintf("TryAcquire succeeded unexpectedly")) 354 } 355 }() 356 wg.Wait() 357 358 ex.Release(p) 359 360 p, ok = ex.TryAcquire() 361 if !ok { 362 panic(fmt.Sprintf("TryAcquire failed")) 363 } 364} 365 366func TestRanger() { 367 s, r := _Ranger[int]() 368 369 ctx := context.Background() 370 go func() { 371 // Receive one value then exit. 372 v, ok := r.Next(ctx) 373 if !ok { 374 panic(fmt.Sprintf("did not receive any values")) 375 } else if v != 1 { 376 panic(fmt.Sprintf("received %d, want 1", v)) 377 } 378 }() 379 380 c1 := make(chan bool) 381 c2 := make(chan bool) 382 go func() { 383 defer close(c2) 384 if !s.Send(ctx, 1) { 385 panic(fmt.Sprintf("Send failed unexpectedly")) 386 } 387 close(c1) 388 if s.Send(ctx, 2) { 389 panic(fmt.Sprintf("Send succeeded unexpectedly")) 390 } 391 }() 392 393 <-c1 394 395 // Force a garbage collection to try to get the finalizers to run. 396 runtime.GC() 397 398 select { 399 case <-c2: 400 case <-time.After(time.Minute): 401 panic("_Ranger Send should have failed, but timed out") 402 } 403} 404 405func main() { 406 TestReadAll() 407 TestMerge() 408 TestFilter() 409 TestSink() 410 TestExclusive() 411 TestExclusiveTry() 412 TestRanger() 413} 414