1// run -gcflags=-G=3
2
3// Copyright 2021 The Go Authors. All rights reserved.
4// Use of this source code is governed by a BSD-style
5// license that can be found in the LICENSE file.
6
7// Package chans provides utility functions for working with channels.
8package main
9
10import (
11	"context"
12	"fmt"
13	"runtime"
14	"sort"
15	"sync"
16	"time"
17)
18
19// _Equal reports whether two slices are equal: the same length and all
20// elements equal. All floating point NaNs are considered equal.
21func _SliceEqual[Elem comparable](s1, s2 []Elem) bool {
22	if len(s1) != len(s2) {
23		return false
24	}
25	for i, v1 := range s1 {
26		v2 := s2[i]
27		if v1 != v2 {
28			isNaN := func(f Elem) bool { return f != f }
29			if !isNaN(v1) || !isNaN(v2) {
30				return false
31			}
32		}
33	}
34	return true
35}
36
37// _ReadAll reads from c until the channel is closed or the context is
38// canceled, returning all the values read.
39func _ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem {
40	var r []Elem
41	for {
42		select {
43		case <-ctx.Done():
44			return r
45		case v, ok := <-c:
46			if !ok {
47				return r
48			}
49			r = append(r, v)
50		}
51	}
52}
53
54// _Merge merges two channels into a single channel.
55// This will leave a goroutine running until either both channels are closed
56// or the context is canceled, at which point the returned channel is closed.
57func _Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem {
58	r := make(chan Elem)
59	go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) {
60		defer close(r)
61		for c1 != nil || c2 != nil {
62			select {
63			case <-ctx.Done():
64				return
65			case v1, ok := <-c1:
66				if ok {
67					r <- v1
68				} else {
69					c1 = nil
70				}
71			case v2, ok := <-c2:
72				if ok {
73					r <- v2
74				} else {
75					c2 = nil
76				}
77			}
78		}
79	}(ctx, c1, c2, r)
80	return r
81}
82
83// _Filter calls f on each value read from c. If f returns true the value
84// is sent on the returned channel. This will leave a goroutine running
85// until c is closed or the context is canceled, at which point the
86// returned channel is closed.
87func _Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem {
88	r := make(chan Elem)
89	go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) {
90		defer close(r)
91		for {
92			select {
93			case <-ctx.Done():
94				return
95			case v, ok := <-c:
96				if !ok {
97					return
98				}
99				if f(v) {
100					r <- v
101				}
102			}
103		}
104	}(ctx, c, f, r)
105	return r
106}
107
108// _Sink returns a channel that discards all values sent to it.
109// This will leave a goroutine running until the context is canceled
110// or the returned channel is closed.
111func _Sink[Elem any](ctx context.Context) chan<- Elem {
112	r := make(chan Elem)
113	go func(ctx context.Context, r <-chan Elem) {
114		for {
115			select {
116			case <-ctx.Done():
117				return
118			case _, ok := <-r:
119				if !ok {
120					return
121				}
122			}
123		}
124	}(ctx, r)
125	return r
126}
127
128// An Exclusive is a value that may only be used by a single goroutine
129// at a time. This is implemented using channels rather than a mutex.
130type _Exclusive[Val any] struct {
131	c chan Val
132}
133
134// _MakeExclusive makes an initialized exclusive value.
135func _MakeExclusive[Val any](initial Val) *_Exclusive[Val] {
136	r := &_Exclusive[Val]{
137		c: make(chan Val, 1),
138	}
139	r.c <- initial
140	return r
141}
142
143// _Acquire acquires the exclusive value for private use.
144// It must be released using the Release method.
145func (e *_Exclusive[Val]) Acquire() Val {
146	return <-e.c
147}
148
149// TryAcquire attempts to acquire the value. The ok result reports whether
150// the value was acquired. If the value is acquired, it must be released
151// using the Release method.
152func (e *_Exclusive[Val]) TryAcquire() (v Val, ok bool) {
153	select {
154	case r := <-e.c:
155		return r, true
156	default:
157		return v, false
158	}
159}
160
161// Release updates and releases the value.
162// This method panics if the value has not been acquired.
163func (e *_Exclusive[Val]) Release(v Val) {
164	select {
165	case e.c <- v:
166	default:
167		panic("_Exclusive Release without Acquire")
168	}
169}
170
171// Ranger returns a Sender and a Receiver. The Receiver provides a
172// Next method to retrieve values. The Sender provides a Send method
173// to send values and a Close method to stop sending values. The Next
174// method indicates when the Sender has been closed, and the Send
175// method indicates when the Receiver has been freed.
176//
177// This is a convenient way to exit a goroutine sending values when
178// the receiver stops reading them.
179func _Ranger[Elem any]() (*_Sender[Elem], *_Receiver[Elem]) {
180	c := make(chan Elem)
181	d := make(chan struct{})
182	s := &_Sender[Elem]{
183		values: c,
184		done:   d,
185	}
186	r := &_Receiver[Elem]{
187		values: c,
188		done:   d,
189	}
190	runtime.SetFinalizer(r, (*_Receiver[Elem]).finalize)
191	return s, r
192}
193
194// A _Sender is used to send values to a Receiver.
195type _Sender[Elem any] struct {
196	values chan<- Elem
197	done   <-chan struct{}
198}
199
200// Send sends a value to the receiver. It reports whether the value was sent.
201// The value will not be sent if the context is closed or the receiver
202// is freed.
203func (s *_Sender[Elem]) Send(ctx context.Context, v Elem) bool {
204	select {
205	case <-ctx.Done():
206		return false
207	case s.values <- v:
208		return true
209	case <-s.done:
210		return false
211	}
212}
213
214// Close tells the receiver that no more values will arrive.
215// After Close is called, the _Sender may no longer be used.
216func (s *_Sender[Elem]) Close() {
217	close(s.values)
218}
219
220// A _Receiver receives values from a _Sender.
221type _Receiver[Elem any] struct {
222	values <-chan Elem
223	done   chan<- struct{}
224}
225
226// Next returns the next value from the channel. The bool result indicates
227// whether the value is valid.
228func (r *_Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) {
229	select {
230	case <-ctx.Done():
231	case v, ok = <-r.values:
232	}
233	return v, ok
234}
235
236// finalize is a finalizer for the receiver.
237func (r *_Receiver[Elem]) finalize() {
238	close(r.done)
239}
240
241func TestReadAll() {
242	c := make(chan int)
243	go func() {
244		c <- 4
245		c <- 2
246		c <- 5
247		close(c)
248	}()
249	got := _ReadAll(context.Background(), c)
250	want := []int{4, 2, 5}
251	if !_SliceEqual(got, want) {
252		panic(fmt.Sprintf("_ReadAll returned %v, want %v", got, want))
253	}
254}
255
256func TestMerge() {
257	c1 := make(chan int)
258	c2 := make(chan int)
259	go func() {
260		c1 <- 1
261		c1 <- 3
262		c1 <- 5
263		close(c1)
264	}()
265	go func() {
266		c2 <- 2
267		c2 <- 4
268		c2 <- 6
269		close(c2)
270	}()
271	ctx := context.Background()
272	got := _ReadAll(ctx, _Merge(ctx, c1, c2))
273	sort.Ints(got)
274	want := []int{1, 2, 3, 4, 5, 6}
275	if !_SliceEqual(got, want) {
276		panic(fmt.Sprintf("_Merge returned %v, want %v", got, want))
277	}
278}
279
280func TestFilter() {
281	c := make(chan int)
282	go func() {
283		c <- 1
284		c <- 2
285		c <- 3
286		close(c)
287	}()
288	even := func(i int) bool { return i%2 == 0 }
289	ctx := context.Background()
290	got := _ReadAll(ctx, _Filter(ctx, c, even))
291	want := []int{2}
292	if !_SliceEqual(got, want) {
293		panic(fmt.Sprintf("_Filter returned %v, want %v", got, want))
294	}
295}
296
297func TestSink() {
298	c := _Sink[int](context.Background())
299	after := time.NewTimer(time.Minute)
300	defer after.Stop()
301	send := func(v int) {
302		select {
303		case c <- v:
304		case <-after.C:
305			panic("timed out sending to _Sink")
306		}
307	}
308	send(1)
309	send(2)
310	send(3)
311	close(c)
312}
313
314func TestExclusive() {
315	val := 0
316	ex := _MakeExclusive(&val)
317
318	var wg sync.WaitGroup
319	f := func() {
320		defer wg.Done()
321		for i := 0; i < 10; i++ {
322			p := ex.Acquire()
323			(*p)++
324			ex.Release(p)
325		}
326	}
327
328	wg.Add(2)
329	go f()
330	go f()
331
332	wg.Wait()
333	if val != 20 {
334		panic(fmt.Sprintf("after Acquire/Release loop got %d, want 20", val))
335	}
336}
337
338func TestExclusiveTry() {
339	s := ""
340	ex := _MakeExclusive(&s)
341	p, ok := ex.TryAcquire()
342	if !ok {
343		panic("TryAcquire failed")
344	}
345	*p = "a"
346
347	var wg sync.WaitGroup
348	wg.Add(1)
349	go func() {
350		defer wg.Done()
351		_, ok := ex.TryAcquire()
352		if ok {
353			panic(fmt.Sprintf("TryAcquire succeeded unexpectedly"))
354		}
355	}()
356	wg.Wait()
357
358	ex.Release(p)
359
360	p, ok = ex.TryAcquire()
361	if !ok {
362		panic(fmt.Sprintf("TryAcquire failed"))
363	}
364}
365
366func TestRanger() {
367	s, r := _Ranger[int]()
368
369	ctx := context.Background()
370	go func() {
371		// Receive one value then exit.
372		v, ok := r.Next(ctx)
373		if !ok {
374			panic(fmt.Sprintf("did not receive any values"))
375		} else if v != 1 {
376			panic(fmt.Sprintf("received %d, want 1", v))
377		}
378	}()
379
380	c1 := make(chan bool)
381	c2 := make(chan bool)
382	go func() {
383		defer close(c2)
384		if !s.Send(ctx, 1) {
385			panic(fmt.Sprintf("Send failed unexpectedly"))
386		}
387		close(c1)
388		if s.Send(ctx, 2) {
389			panic(fmt.Sprintf("Send succeeded unexpectedly"))
390		}
391	}()
392
393	<-c1
394
395	// Force a garbage collection to try to get the finalizers to run.
396	runtime.GC()
397
398	select {
399	case <-c2:
400	case <-time.After(time.Minute):
401		panic("_Ranger Send should have failed, but timed out")
402	}
403}
404
405func main() {
406	TestReadAll()
407	TestMerge()
408	TestFilter()
409	TestSink()
410	TestExclusive()
411	TestExclusiveTry()
412	TestRanger()
413}
414