1// Copyright 2021 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package a 6 7import ( 8 "context" 9 "runtime" 10) 11 12// Equal reports whether two slices are equal: the same length and all 13// elements equal. All floating point NaNs are considered equal. 14func SliceEqual[Elem comparable](s1, s2 []Elem) bool { 15 if len(s1) != len(s2) { 16 return false 17 } 18 for i, v1 := range s1 { 19 v2 := s2[i] 20 if v1 != v2 { 21 isNaN := func(f Elem) bool { return f != f } 22 if !isNaN(v1) || !isNaN(v2) { 23 return false 24 } 25 } 26 } 27 return true 28} 29 30// ReadAll reads from c until the channel is closed or the context is 31// canceled, returning all the values read. 32func ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem { 33 var r []Elem 34 for { 35 select { 36 case <-ctx.Done(): 37 return r 38 case v, ok := <-c: 39 if !ok { 40 return r 41 } 42 r = append(r, v) 43 } 44 } 45} 46 47// Merge merges two channels into a single channel. 48// This will leave a goroutine running until either both channels are closed 49// or the context is canceled, at which point the returned channel is closed. 50func Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem { 51 r := make(chan Elem) 52 go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) { 53 defer close(r) 54 for c1 != nil || c2 != nil { 55 select { 56 case <-ctx.Done(): 57 return 58 case v1, ok := <-c1: 59 if ok { 60 r <- v1 61 } else { 62 c1 = nil 63 } 64 case v2, ok := <-c2: 65 if ok { 66 r <- v2 67 } else { 68 c2 = nil 69 } 70 } 71 } 72 }(ctx, c1, c2, r) 73 return r 74} 75 76// Filter calls f on each value read from c. If f returns true the value 77// is sent on the returned channel. This will leave a goroutine running 78// until c is closed or the context is canceled, at which point the 79// returned channel is closed. 80func Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem { 81 r := make(chan Elem) 82 go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) { 83 defer close(r) 84 for { 85 select { 86 case <-ctx.Done(): 87 return 88 case v, ok := <-c: 89 if !ok { 90 return 91 } 92 if f(v) { 93 r <- v 94 } 95 } 96 } 97 }(ctx, c, f, r) 98 return r 99} 100 101// Sink returns a channel that discards all values sent to it. 102// This will leave a goroutine running until the context is canceled 103// or the returned channel is closed. 104func Sink[Elem any](ctx context.Context) chan<- Elem { 105 r := make(chan Elem) 106 go func(ctx context.Context, r <-chan Elem) { 107 for { 108 select { 109 case <-ctx.Done(): 110 return 111 case _, ok := <-r: 112 if !ok { 113 return 114 } 115 } 116 } 117 }(ctx, r) 118 return r 119} 120 121// An Exclusive is a value that may only be used by a single goroutine 122// at a time. This is implemented using channels rather than a mutex. 123type Exclusive[Val any] struct { 124 c chan Val 125} 126 127// MakeExclusive makes an initialized exclusive value. 128func MakeExclusive[Val any](initial Val) *Exclusive[Val] { 129 r := &Exclusive[Val]{ 130 c: make(chan Val, 1), 131 } 132 r.c <- initial 133 return r 134} 135 136// Acquire acquires the exclusive value for private use. 137// It must be released using the Release method. 138func (e *Exclusive[Val]) Acquire() Val { 139 return <-e.c 140} 141 142// TryAcquire attempts to acquire the value. The ok result reports whether 143// the value was acquired. If the value is acquired, it must be released 144// using the Release method. 145func (e *Exclusive[Val]) TryAcquire() (v Val, ok bool) { 146 select { 147 case r := <-e.c: 148 return r, true 149 default: 150 return v, false 151 } 152} 153 154// Release updates and releases the value. 155// This method panics if the value has not been acquired. 156func (e *Exclusive[Val]) Release(v Val) { 157 select { 158 case e.c <- v: 159 default: 160 panic("Exclusive Release without Acquire") 161 } 162} 163 164// Ranger returns a Sender and a Receiver. The Receiver provides a 165// Next method to retrieve values. The Sender provides a Send method 166// to send values and a Close method to stop sending values. The Next 167// method indicates when the Sender has been closed, and the Send 168// method indicates when the Receiver has been freed. 169// 170// This is a convenient way to exit a goroutine sending values when 171// the receiver stops reading them. 172func Ranger[Elem any]() (*Sender[Elem], *Receiver[Elem]) { 173 c := make(chan Elem) 174 d := make(chan struct{}) 175 s := &Sender[Elem]{ 176 values: c, 177 done: d, 178 } 179 r := &Receiver[Elem]{ 180 values: c, 181 done: d, 182 } 183 runtime.SetFinalizer(r, (*Receiver[Elem]).finalize) 184 return s, r 185} 186 187// A Sender is used to send values to a Receiver. 188type Sender[Elem any] struct { 189 values chan<- Elem 190 done <-chan struct{} 191} 192 193// Send sends a value to the receiver. It reports whether the value was sent. 194// The value will not be sent if the context is closed or the receiver 195// is freed. 196func (s *Sender[Elem]) Send(ctx context.Context, v Elem) bool { 197 select { 198 case <-ctx.Done(): 199 return false 200 case s.values <- v: 201 return true 202 case <-s.done: 203 return false 204 } 205} 206 207// Close tells the receiver that no more values will arrive. 208// After Close is called, the Sender may no longer be used. 209func (s *Sender[Elem]) Close() { 210 close(s.values) 211} 212 213// A Receiver receives values from a Sender. 214type Receiver[Elem any] struct { 215 values <-chan Elem 216 done chan<- struct{} 217} 218 219// Next returns the next value from the channel. The bool result indicates 220// whether the value is valid. 221func (r *Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) { 222 select { 223 case <-ctx.Done(): 224 case v, ok = <-r.values: 225 } 226 return v, ok 227} 228 229// finalize is a finalizer for the receiver. 230func (r *Receiver[Elem]) finalize() { 231 close(r.done) 232} 233