1package runtime
2
3// This file implements the 'chan' type and send/receive/select operations.
4
5// A channel can be in one of the following states:
6//     empty:
7//       No goroutine is waiting on a send or receive operation. The 'blocked'
8//       member is nil.
9//     recv:
10//       A goroutine tries to receive from the channel. This goroutine is stored
11//       in the 'blocked' member.
12//     send:
13//       The reverse of send. A goroutine tries to send to the channel. This
14//       goroutine is stored in the 'blocked' member.
15//     closed:
16//       The channel is closed. Sends will panic, receives will get a zero value
17//       plus optionally the indication that the channel is zero (with the
18//       commao-ok value in the coroutine).
19//
20// A send/recv transmission is completed by copying from the data element of the
21// sending coroutine to the data element of the receiving coroutine, and setting
22// the 'comma-ok' value to true.
23// A receive operation on a closed channel is completed by zeroing the data
24// element of the receiving coroutine and setting the 'comma-ok' value to false.
25
26import (
27	"internal/task"
28	"runtime/interrupt"
29	"unsafe"
30)
31
32func chanDebug(ch *channel) {
33	if schedulerDebug {
34		if ch.bufSize > 0 {
35			println("--- channel update:", ch, ch.state.String(), ch.bufSize, ch.bufUsed)
36		} else {
37			println("--- channel update:", ch, ch.state.String())
38		}
39	}
40}
41
42// channelBlockedList is a list of channel operations on a specific channel which are currently blocked.
43type channelBlockedList struct {
44	// next is a pointer to the next blocked channel operation on the same channel.
45	next *channelBlockedList
46
47	// t is the task associated with this channel operation.
48	// If this channel operation is not part of a select, then the pointer field of the state holds the data buffer.
49	// If this channel operation is part of a select, then the pointer field of the state holds the recieve buffer.
50	// If this channel operation is a receive, then the data field should be set to zero when resuming due to channel closure.
51	t *task.Task
52
53	// s is a pointer to the channel select state corresponding to this operation.
54	// This will be nil if and only if this channel operation is not part of a select statement.
55	// If this is a send operation, then the send buffer can be found in this select state.
56	s *chanSelectState
57
58	// allSelectOps is a slice containing all of the channel operations involved with this select statement.
59	// Before resuming the task, all other channel operations on this select statement should be canceled by removing them from their corresponding lists.
60	allSelectOps []channelBlockedList
61}
62
63// remove takes the current list of blocked channel operations and removes the specified operation.
64// This returns the resulting list, or nil if the resulting list is empty.
65// A nil receiver is treated as an empty list.
66func (b *channelBlockedList) remove(old *channelBlockedList) *channelBlockedList {
67	if b == old {
68		return b.next
69	}
70	c := b
71	for ; c != nil && c.next != old; c = c.next {
72	}
73	if c != nil {
74		c.next = old.next
75	}
76	return b
77}
78
79// detatch removes all other channel operations that are part of the same select statement.
80// If the input is not part of a select statement, this is a no-op.
81// This must be called before resuming any task blocked on a channel operation in order to ensure that it is not placed on the runqueue twice.
82func (b *channelBlockedList) detach() {
83	if b.allSelectOps == nil {
84		// nothing to do
85		return
86	}
87	for i, v := range b.allSelectOps {
88		// cancel all other channel operations that are part of this select statement
89		switch {
90		case &b.allSelectOps[i] == b:
91			// This entry is the one that was already detatched.
92			continue
93		case v.t == nil:
94			// This entry is not used (nil channel).
95			continue
96		}
97		v.s.ch.blocked = v.s.ch.blocked.remove(&b.allSelectOps[i])
98		if v.s.ch.blocked == nil {
99			if v.s.value == nil {
100				// recv operation
101				if v.s.ch.state != chanStateClosed {
102					v.s.ch.state = chanStateEmpty
103				}
104			} else {
105				// send operation
106				if v.s.ch.bufUsed == 0 {
107					// unbuffered channel
108					v.s.ch.state = chanStateEmpty
109				} else {
110					// buffered channel
111					v.s.ch.state = chanStateBuf
112				}
113			}
114		}
115		chanDebug(v.s.ch)
116	}
117}
118
119type channel struct {
120	elementSize uintptr // the size of one value in this channel
121	bufSize     uintptr // size of buffer (in elements)
122	state       chanState
123	blocked     *channelBlockedList
124	bufHead     uintptr        // head index of buffer (next push index)
125	bufTail     uintptr        // tail index of buffer (next pop index)
126	bufUsed     uintptr        // number of elements currently in buffer
127	buf         unsafe.Pointer // pointer to first element of buffer
128}
129
130// chanMake creates a new channel with the given element size and buffer length in number of elements.
131// This is a compiler intrinsic.
132func chanMake(elementSize uintptr, bufSize uintptr) *channel {
133	return &channel{
134		elementSize: elementSize,
135		bufSize:     bufSize,
136		buf:         alloc(elementSize * bufSize),
137	}
138}
139
140// Return the number of entries in this chan, called from the len builtin.
141// A nil chan is defined as having length 0.
142//go:inline
143func chanLen(c *channel) int {
144	if c == nil {
145		return 0
146	}
147	return int(c.bufUsed)
148}
149
150// wrapper for use in reflect
151func chanLenUnsafePointer(p unsafe.Pointer) int {
152	c := (*channel)(p)
153	return chanLen(c)
154}
155
156// Return the capacity of this chan, called from the cap builtin.
157// A nil chan is defined as having capacity 0.
158//go:inline
159func chanCap(c *channel) int {
160	if c == nil {
161		return 0
162	}
163	return int(c.bufSize)
164}
165
166// wrapper for use in reflect
167func chanCapUnsafePointer(p unsafe.Pointer) int {
168	c := (*channel)(p)
169	return chanCap(c)
170}
171
172// resumeRX resumes the next receiver and returns the destination pointer.
173// If the ok value is true, then the caller is expected to store a value into this pointer.
174func (ch *channel) resumeRX(ok bool) unsafe.Pointer {
175	// pop a blocked goroutine off the stack
176	var b *channelBlockedList
177	b, ch.blocked = ch.blocked, ch.blocked.next
178
179	// get destination pointer
180	dst := b.t.Ptr
181
182	if !ok {
183		// the result value is zero
184		memzero(dst, ch.elementSize)
185		b.t.Data = 0
186	}
187
188	if b.s != nil {
189		// tell the select op which case resumed
190		b.t.Ptr = unsafe.Pointer(b.s)
191
192		// detach associated operations
193		b.detach()
194	}
195
196	// push task onto runqueue
197	runqueue.Push(b.t)
198
199	return dst
200}
201
202// resumeTX resumes the next sender and returns the source pointer.
203// The caller is expected to read from the value in this pointer before yielding.
204func (ch *channel) resumeTX() unsafe.Pointer {
205	// pop a blocked goroutine off the stack
206	var b *channelBlockedList
207	b, ch.blocked = ch.blocked, ch.blocked.next
208
209	// get source pointer
210	src := b.t.Ptr
211
212	if b.s != nil {
213		// use state's source pointer
214		src = b.s.value
215
216		// tell the select op which case resumed
217		b.t.Ptr = unsafe.Pointer(b.s)
218
219		// detach associated operations
220		b.detach()
221	}
222
223	// push task onto runqueue
224	runqueue.Push(b.t)
225
226	return src
227}
228
229// push value to end of channel if space is available
230// returns whether there was space for the value in the buffer
231func (ch *channel) push(value unsafe.Pointer) bool {
232	// immediately return false if the channel is not buffered
233	if ch.bufSize == 0 {
234		return false
235	}
236
237	// ensure space is available
238	if ch.bufUsed == ch.bufSize {
239		return false
240	}
241
242	// copy value to buffer
243	memcpy(
244		unsafe.Pointer( // pointer to the base of the buffer + offset = pointer to destination element
245			uintptr(ch.buf)+
246				uintptr( // element size * equivalent slice index = offset
247					ch.elementSize* // element size (bytes)
248						ch.bufHead, // index of first available buffer entry
249				),
250		),
251		value,
252		ch.elementSize,
253	)
254
255	// update buffer state
256	ch.bufUsed++
257	ch.bufHead++
258	if ch.bufHead == ch.bufSize {
259		ch.bufHead = 0
260	}
261
262	return true
263}
264
265// pop value from channel buffer if one is available
266// returns whether a value was popped or not
267// result is stored into value pointer
268func (ch *channel) pop(value unsafe.Pointer) bool {
269	// channel is empty
270	if ch.bufUsed == 0 {
271		return false
272	}
273
274	// compute address of source
275	addr := unsafe.Pointer(uintptr(ch.buf) + (ch.elementSize * ch.bufTail))
276
277	// copy value from buffer
278	memcpy(
279		value,
280		addr,
281		ch.elementSize,
282	)
283
284	// zero buffer element to allow garbage collection of value
285	memzero(
286		addr,
287		ch.elementSize,
288	)
289
290	// update buffer state
291	ch.bufUsed--
292
293	// move tail up
294	ch.bufTail++
295	if ch.bufTail == ch.bufSize {
296		ch.bufTail = 0
297	}
298
299	return true
300}
301
302// try to send a value to a channel, without actually blocking
303// returns whether the value was sent
304// will panic if channel is closed
305func (ch *channel) trySend(value unsafe.Pointer) bool {
306	if ch == nil {
307		// send to nil channel blocks forever
308		// this is non-blocking, so just say no
309		return false
310	}
311
312	i := interrupt.Disable()
313
314	switch ch.state {
315	case chanStateEmpty, chanStateBuf:
316		// try to dump the value directly into the buffer
317		if ch.push(value) {
318			ch.state = chanStateBuf
319			interrupt.Restore(i)
320			return true
321		}
322		interrupt.Restore(i)
323		return false
324	case chanStateRecv:
325		// unblock reciever
326		dst := ch.resumeRX(true)
327
328		// copy value to reciever
329		memcpy(dst, value, ch.elementSize)
330
331		// change state to empty if there are no more receivers
332		if ch.blocked == nil {
333			ch.state = chanStateEmpty
334		}
335
336		interrupt.Restore(i)
337		return true
338	case chanStateSend:
339		// something else is already waiting to send
340		interrupt.Restore(i)
341		return false
342	case chanStateClosed:
343		interrupt.Restore(i)
344		runtimePanic("send on closed channel")
345	default:
346		interrupt.Restore(i)
347		runtimePanic("invalid channel state")
348	}
349
350	interrupt.Restore(i)
351	return false
352}
353
354// try to recieve a value from a channel, without really blocking
355// returns whether a value was recieved
356// second return is the comma-ok value
357func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) {
358	if ch == nil {
359		// recieve from nil channel blocks forever
360		// this is non-blocking, so just say no
361		return false, false
362	}
363
364	i := interrupt.Disable()
365
366	switch ch.state {
367	case chanStateBuf, chanStateSend:
368		// try to pop the value directly from the buffer
369		if ch.pop(value) {
370			// unblock next sender if applicable
371			if ch.blocked != nil {
372				src := ch.resumeTX()
373
374				// push sender's value into buffer
375				ch.push(src)
376
377				if ch.blocked == nil {
378					// last sender unblocked - update state
379					ch.state = chanStateBuf
380				}
381			}
382
383			if ch.bufUsed == 0 {
384				// channel empty - update state
385				ch.state = chanStateEmpty
386			}
387
388			interrupt.Restore(i)
389			return true, true
390		} else if ch.blocked != nil {
391			// unblock next sender if applicable
392			src := ch.resumeTX()
393
394			// copy sender's value
395			memcpy(value, src, ch.elementSize)
396
397			if ch.blocked == nil {
398				// last sender unblocked - update state
399				ch.state = chanStateEmpty
400			}
401
402			interrupt.Restore(i)
403			return true, true
404		}
405		interrupt.Restore(i)
406		return false, false
407	case chanStateRecv, chanStateEmpty:
408		// something else is already waiting to recieve
409		interrupt.Restore(i)
410		return false, false
411	case chanStateClosed:
412		if ch.pop(value) {
413			interrupt.Restore(i)
414			return true, true
415		}
416
417		// channel closed - nothing to recieve
418		memzero(value, ch.elementSize)
419		interrupt.Restore(i)
420		return true, false
421	default:
422		runtimePanic("invalid channel state")
423	}
424
425	runtimePanic("unreachable")
426	return false, false
427}
428
429type chanState uint8
430
431const (
432	chanStateEmpty  chanState = iota // nothing in channel, no senders/recievers
433	chanStateRecv                    // nothing in channel, recievers waiting
434	chanStateSend                    // senders waiting, buffer full if present
435	chanStateBuf                     // buffer not empty, no senders waiting
436	chanStateClosed                  // channel closed
437)
438
439func (s chanState) String() string {
440	switch s {
441	case chanStateEmpty:
442		return "empty"
443	case chanStateRecv:
444		return "recv"
445	case chanStateSend:
446		return "send"
447	case chanStateBuf:
448		return "buffered"
449	case chanStateClosed:
450		return "closed"
451	default:
452		return "invalid"
453	}
454}
455
456// chanSelectState is a single channel operation (send/recv) in a select
457// statement. The value pointer is either nil (for receives) or points to the
458// value to send (for sends).
459type chanSelectState struct {
460	ch    *channel
461	value unsafe.Pointer
462}
463
464// chanSend sends a single value over the channel.
465// This operation will block unless a value is immediately available.
466// May panic if the channel is closed.
467func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) {
468	i := interrupt.Disable()
469
470	if ch.trySend(value) {
471		// value immediately sent
472		chanDebug(ch)
473		interrupt.Restore(i)
474		return
475	}
476
477	if ch == nil {
478		// A nil channel blocks forever. Do not schedule this goroutine again.
479		interrupt.Restore(i)
480		deadlock()
481	}
482
483	// wait for reciever
484	sender := task.Current()
485	ch.state = chanStateSend
486	sender.Ptr = value
487	*blockedlist = channelBlockedList{
488		next: ch.blocked,
489		t:    sender,
490	}
491	ch.blocked = blockedlist
492	chanDebug(ch)
493	interrupt.Restore(i)
494	task.Pause()
495	sender.Ptr = nil
496}
497
498// chanRecv receives a single value over a channel.
499// It blocks if there is no available value to recieve.
500// The recieved value is copied into the value pointer.
501// Returns the comma-ok value.
502func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool {
503	i := interrupt.Disable()
504
505	if rx, ok := ch.tryRecv(value); rx {
506		// value immediately available
507		chanDebug(ch)
508		interrupt.Restore(i)
509		return ok
510	}
511
512	if ch == nil {
513		// A nil channel blocks forever. Do not schedule this goroutine again.
514		interrupt.Restore(i)
515		deadlock()
516	}
517
518	// wait for a value
519	receiver := task.Current()
520	ch.state = chanStateRecv
521	receiver.Ptr, receiver.Data = value, 1
522	*blockedlist = channelBlockedList{
523		next: ch.blocked,
524		t:    receiver,
525	}
526	ch.blocked = blockedlist
527	chanDebug(ch)
528	interrupt.Restore(i)
529	task.Pause()
530	ok := receiver.Data == 1
531	receiver.Ptr, receiver.Data = nil, 0
532	return ok
533}
534
535// chanClose closes the given channel. If this channel has a receiver or is
536// empty, it closes the channel. Else, it panics.
537func chanClose(ch *channel) {
538	if ch == nil {
539		// Not allowed by the language spec.
540		runtimePanic("close of nil channel")
541	}
542	i := interrupt.Disable()
543	switch ch.state {
544	case chanStateClosed:
545		// Not allowed by the language spec.
546		interrupt.Restore(i)
547		runtimePanic("close of closed channel")
548	case chanStateSend:
549		// This panic should ideally on the sending side, not in this goroutine.
550		// But when a goroutine tries to send while the channel is being closed,
551		// that is clearly invalid: the send should have been completed already
552		// before the close.
553		interrupt.Restore(i)
554		runtimePanic("close channel during send")
555	case chanStateRecv:
556		// unblock all receivers with the zero value
557		ch.state = chanStateClosed
558		for ch.blocked != nil {
559			ch.resumeRX(false)
560		}
561	case chanStateEmpty, chanStateBuf:
562		// Easy case. No available sender or receiver.
563	}
564	ch.state = chanStateClosed
565	interrupt.Restore(i)
566	chanDebug(ch)
567}
568
569// chanSelect is the runtime implementation of the select statement. This is
570// perhaps the most complicated statement in the Go spec. It returns the
571// selected index and the 'comma-ok' value.
572//
573// TODO: do this in a round-robin fashion (as specified in the Go spec) instead
574// of picking the first one that can proceed.
575func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) {
576	istate := interrupt.Disable()
577
578	if selected, ok := tryChanSelect(recvbuf, states); selected != ^uintptr(0) {
579		// one channel was immediately ready
580		interrupt.Restore(istate)
581		return selected, ok
582	}
583
584	// construct blocked operations
585	for i, v := range states {
586		if v.ch == nil {
587			// A nil channel receive will never complete.
588			// A nil channel send would have panicked during tryChanSelect.
589			ops[i] = channelBlockedList{}
590			continue
591		}
592
593		ops[i] = channelBlockedList{
594			next:         v.ch.blocked,
595			t:            task.Current(),
596			s:            &states[i],
597			allSelectOps: ops,
598		}
599		v.ch.blocked = &ops[i]
600		if v.value == nil {
601			// recv
602			switch v.ch.state {
603			case chanStateEmpty:
604				v.ch.state = chanStateRecv
605			case chanStateRecv:
606				// already in correct state
607			default:
608				interrupt.Restore(istate)
609				runtimePanic("invalid channel state")
610			}
611		} else {
612			// send
613			switch v.ch.state {
614			case chanStateEmpty:
615				v.ch.state = chanStateSend
616			case chanStateSend:
617				// already in correct state
618			case chanStateBuf:
619				// already in correct state
620			default:
621				interrupt.Restore(istate)
622				runtimePanic("invalid channel state")
623			}
624		}
625		chanDebug(v.ch)
626	}
627
628	// expose rx buffer
629	t := task.Current()
630	t.Ptr = recvbuf
631	t.Data = 1
632
633	// wait for one case to fire
634	interrupt.Restore(istate)
635	task.Pause()
636
637	// figure out which one fired and return the ok value
638	return (uintptr(t.Ptr) - uintptr(unsafe.Pointer(&states[0]))) / unsafe.Sizeof(chanSelectState{}), t.Data != 0
639}
640
641// tryChanSelect is like chanSelect, but it does a non-blocking select operation.
642func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) {
643	istate := interrupt.Disable()
644
645	// See whether we can receive from one of the channels.
646	for i, state := range states {
647		if state.value == nil {
648			// A receive operation.
649			if rx, ok := state.ch.tryRecv(recvbuf); rx {
650				chanDebug(state.ch)
651				interrupt.Restore(istate)
652				return uintptr(i), ok
653			}
654		} else {
655			// A send operation: state.value is not nil.
656			if state.ch.trySend(state.value) {
657				chanDebug(state.ch)
658				interrupt.Restore(istate)
659				return uintptr(i), true
660			}
661		}
662	}
663
664	interrupt.Restore(istate)
665	return ^uintptr(0), false
666}
667