1package runtime 2 3// This file implements the 'chan' type and send/receive/select operations. 4 5// A channel can be in one of the following states: 6// empty: 7// No goroutine is waiting on a send or receive operation. The 'blocked' 8// member is nil. 9// recv: 10// A goroutine tries to receive from the channel. This goroutine is stored 11// in the 'blocked' member. 12// send: 13// The reverse of send. A goroutine tries to send to the channel. This 14// goroutine is stored in the 'blocked' member. 15// closed: 16// The channel is closed. Sends will panic, receives will get a zero value 17// plus optionally the indication that the channel is zero (with the 18// commao-ok value in the coroutine). 19// 20// A send/recv transmission is completed by copying from the data element of the 21// sending coroutine to the data element of the receiving coroutine, and setting 22// the 'comma-ok' value to true. 23// A receive operation on a closed channel is completed by zeroing the data 24// element of the receiving coroutine and setting the 'comma-ok' value to false. 25 26import ( 27 "internal/task" 28 "runtime/interrupt" 29 "unsafe" 30) 31 32func chanDebug(ch *channel) { 33 if schedulerDebug { 34 if ch.bufSize > 0 { 35 println("--- channel update:", ch, ch.state.String(), ch.bufSize, ch.bufUsed) 36 } else { 37 println("--- channel update:", ch, ch.state.String()) 38 } 39 } 40} 41 42// channelBlockedList is a list of channel operations on a specific channel which are currently blocked. 43type channelBlockedList struct { 44 // next is a pointer to the next blocked channel operation on the same channel. 45 next *channelBlockedList 46 47 // t is the task associated with this channel operation. 48 // If this channel operation is not part of a select, then the pointer field of the state holds the data buffer. 49 // If this channel operation is part of a select, then the pointer field of the state holds the recieve buffer. 50 // If this channel operation is a receive, then the data field should be set to zero when resuming due to channel closure. 51 t *task.Task 52 53 // s is a pointer to the channel select state corresponding to this operation. 54 // This will be nil if and only if this channel operation is not part of a select statement. 55 // If this is a send operation, then the send buffer can be found in this select state. 56 s *chanSelectState 57 58 // allSelectOps is a slice containing all of the channel operations involved with this select statement. 59 // Before resuming the task, all other channel operations on this select statement should be canceled by removing them from their corresponding lists. 60 allSelectOps []channelBlockedList 61} 62 63// remove takes the current list of blocked channel operations and removes the specified operation. 64// This returns the resulting list, or nil if the resulting list is empty. 65// A nil receiver is treated as an empty list. 66func (b *channelBlockedList) remove(old *channelBlockedList) *channelBlockedList { 67 if b == old { 68 return b.next 69 } 70 c := b 71 for ; c != nil && c.next != old; c = c.next { 72 } 73 if c != nil { 74 c.next = old.next 75 } 76 return b 77} 78 79// detatch removes all other channel operations that are part of the same select statement. 80// If the input is not part of a select statement, this is a no-op. 81// This must be called before resuming any task blocked on a channel operation in order to ensure that it is not placed on the runqueue twice. 82func (b *channelBlockedList) detach() { 83 if b.allSelectOps == nil { 84 // nothing to do 85 return 86 } 87 for i, v := range b.allSelectOps { 88 // cancel all other channel operations that are part of this select statement 89 switch { 90 case &b.allSelectOps[i] == b: 91 // This entry is the one that was already detatched. 92 continue 93 case v.t == nil: 94 // This entry is not used (nil channel). 95 continue 96 } 97 v.s.ch.blocked = v.s.ch.blocked.remove(&b.allSelectOps[i]) 98 if v.s.ch.blocked == nil { 99 if v.s.value == nil { 100 // recv operation 101 if v.s.ch.state != chanStateClosed { 102 v.s.ch.state = chanStateEmpty 103 } 104 } else { 105 // send operation 106 if v.s.ch.bufUsed == 0 { 107 // unbuffered channel 108 v.s.ch.state = chanStateEmpty 109 } else { 110 // buffered channel 111 v.s.ch.state = chanStateBuf 112 } 113 } 114 } 115 chanDebug(v.s.ch) 116 } 117} 118 119type channel struct { 120 elementSize uintptr // the size of one value in this channel 121 bufSize uintptr // size of buffer (in elements) 122 state chanState 123 blocked *channelBlockedList 124 bufHead uintptr // head index of buffer (next push index) 125 bufTail uintptr // tail index of buffer (next pop index) 126 bufUsed uintptr // number of elements currently in buffer 127 buf unsafe.Pointer // pointer to first element of buffer 128} 129 130// chanMake creates a new channel with the given element size and buffer length in number of elements. 131// This is a compiler intrinsic. 132func chanMake(elementSize uintptr, bufSize uintptr) *channel { 133 return &channel{ 134 elementSize: elementSize, 135 bufSize: bufSize, 136 buf: alloc(elementSize * bufSize), 137 } 138} 139 140// Return the number of entries in this chan, called from the len builtin. 141// A nil chan is defined as having length 0. 142//go:inline 143func chanLen(c *channel) int { 144 if c == nil { 145 return 0 146 } 147 return int(c.bufUsed) 148} 149 150// wrapper for use in reflect 151func chanLenUnsafePointer(p unsafe.Pointer) int { 152 c := (*channel)(p) 153 return chanLen(c) 154} 155 156// Return the capacity of this chan, called from the cap builtin. 157// A nil chan is defined as having capacity 0. 158//go:inline 159func chanCap(c *channel) int { 160 if c == nil { 161 return 0 162 } 163 return int(c.bufSize) 164} 165 166// wrapper for use in reflect 167func chanCapUnsafePointer(p unsafe.Pointer) int { 168 c := (*channel)(p) 169 return chanCap(c) 170} 171 172// resumeRX resumes the next receiver and returns the destination pointer. 173// If the ok value is true, then the caller is expected to store a value into this pointer. 174func (ch *channel) resumeRX(ok bool) unsafe.Pointer { 175 // pop a blocked goroutine off the stack 176 var b *channelBlockedList 177 b, ch.blocked = ch.blocked, ch.blocked.next 178 179 // get destination pointer 180 dst := b.t.Ptr 181 182 if !ok { 183 // the result value is zero 184 memzero(dst, ch.elementSize) 185 b.t.Data = 0 186 } 187 188 if b.s != nil { 189 // tell the select op which case resumed 190 b.t.Ptr = unsafe.Pointer(b.s) 191 192 // detach associated operations 193 b.detach() 194 } 195 196 // push task onto runqueue 197 runqueue.Push(b.t) 198 199 return dst 200} 201 202// resumeTX resumes the next sender and returns the source pointer. 203// The caller is expected to read from the value in this pointer before yielding. 204func (ch *channel) resumeTX() unsafe.Pointer { 205 // pop a blocked goroutine off the stack 206 var b *channelBlockedList 207 b, ch.blocked = ch.blocked, ch.blocked.next 208 209 // get source pointer 210 src := b.t.Ptr 211 212 if b.s != nil { 213 // use state's source pointer 214 src = b.s.value 215 216 // tell the select op which case resumed 217 b.t.Ptr = unsafe.Pointer(b.s) 218 219 // detach associated operations 220 b.detach() 221 } 222 223 // push task onto runqueue 224 runqueue.Push(b.t) 225 226 return src 227} 228 229// push value to end of channel if space is available 230// returns whether there was space for the value in the buffer 231func (ch *channel) push(value unsafe.Pointer) bool { 232 // immediately return false if the channel is not buffered 233 if ch.bufSize == 0 { 234 return false 235 } 236 237 // ensure space is available 238 if ch.bufUsed == ch.bufSize { 239 return false 240 } 241 242 // copy value to buffer 243 memcpy( 244 unsafe.Pointer( // pointer to the base of the buffer + offset = pointer to destination element 245 uintptr(ch.buf)+ 246 uintptr( // element size * equivalent slice index = offset 247 ch.elementSize* // element size (bytes) 248 ch.bufHead, // index of first available buffer entry 249 ), 250 ), 251 value, 252 ch.elementSize, 253 ) 254 255 // update buffer state 256 ch.bufUsed++ 257 ch.bufHead++ 258 if ch.bufHead == ch.bufSize { 259 ch.bufHead = 0 260 } 261 262 return true 263} 264 265// pop value from channel buffer if one is available 266// returns whether a value was popped or not 267// result is stored into value pointer 268func (ch *channel) pop(value unsafe.Pointer) bool { 269 // channel is empty 270 if ch.bufUsed == 0 { 271 return false 272 } 273 274 // compute address of source 275 addr := unsafe.Pointer(uintptr(ch.buf) + (ch.elementSize * ch.bufTail)) 276 277 // copy value from buffer 278 memcpy( 279 value, 280 addr, 281 ch.elementSize, 282 ) 283 284 // zero buffer element to allow garbage collection of value 285 memzero( 286 addr, 287 ch.elementSize, 288 ) 289 290 // update buffer state 291 ch.bufUsed-- 292 293 // move tail up 294 ch.bufTail++ 295 if ch.bufTail == ch.bufSize { 296 ch.bufTail = 0 297 } 298 299 return true 300} 301 302// try to send a value to a channel, without actually blocking 303// returns whether the value was sent 304// will panic if channel is closed 305func (ch *channel) trySend(value unsafe.Pointer) bool { 306 if ch == nil { 307 // send to nil channel blocks forever 308 // this is non-blocking, so just say no 309 return false 310 } 311 312 i := interrupt.Disable() 313 314 switch ch.state { 315 case chanStateEmpty, chanStateBuf: 316 // try to dump the value directly into the buffer 317 if ch.push(value) { 318 ch.state = chanStateBuf 319 interrupt.Restore(i) 320 return true 321 } 322 interrupt.Restore(i) 323 return false 324 case chanStateRecv: 325 // unblock reciever 326 dst := ch.resumeRX(true) 327 328 // copy value to reciever 329 memcpy(dst, value, ch.elementSize) 330 331 // change state to empty if there are no more receivers 332 if ch.blocked == nil { 333 ch.state = chanStateEmpty 334 } 335 336 interrupt.Restore(i) 337 return true 338 case chanStateSend: 339 // something else is already waiting to send 340 interrupt.Restore(i) 341 return false 342 case chanStateClosed: 343 interrupt.Restore(i) 344 runtimePanic("send on closed channel") 345 default: 346 interrupt.Restore(i) 347 runtimePanic("invalid channel state") 348 } 349 350 interrupt.Restore(i) 351 return false 352} 353 354// try to recieve a value from a channel, without really blocking 355// returns whether a value was recieved 356// second return is the comma-ok value 357func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) { 358 if ch == nil { 359 // recieve from nil channel blocks forever 360 // this is non-blocking, so just say no 361 return false, false 362 } 363 364 i := interrupt.Disable() 365 366 switch ch.state { 367 case chanStateBuf, chanStateSend: 368 // try to pop the value directly from the buffer 369 if ch.pop(value) { 370 // unblock next sender if applicable 371 if ch.blocked != nil { 372 src := ch.resumeTX() 373 374 // push sender's value into buffer 375 ch.push(src) 376 377 if ch.blocked == nil { 378 // last sender unblocked - update state 379 ch.state = chanStateBuf 380 } 381 } 382 383 if ch.bufUsed == 0 { 384 // channel empty - update state 385 ch.state = chanStateEmpty 386 } 387 388 interrupt.Restore(i) 389 return true, true 390 } else if ch.blocked != nil { 391 // unblock next sender if applicable 392 src := ch.resumeTX() 393 394 // copy sender's value 395 memcpy(value, src, ch.elementSize) 396 397 if ch.blocked == nil { 398 // last sender unblocked - update state 399 ch.state = chanStateEmpty 400 } 401 402 interrupt.Restore(i) 403 return true, true 404 } 405 interrupt.Restore(i) 406 return false, false 407 case chanStateRecv, chanStateEmpty: 408 // something else is already waiting to recieve 409 interrupt.Restore(i) 410 return false, false 411 case chanStateClosed: 412 if ch.pop(value) { 413 interrupt.Restore(i) 414 return true, true 415 } 416 417 // channel closed - nothing to recieve 418 memzero(value, ch.elementSize) 419 interrupt.Restore(i) 420 return true, false 421 default: 422 runtimePanic("invalid channel state") 423 } 424 425 runtimePanic("unreachable") 426 return false, false 427} 428 429type chanState uint8 430 431const ( 432 chanStateEmpty chanState = iota // nothing in channel, no senders/recievers 433 chanStateRecv // nothing in channel, recievers waiting 434 chanStateSend // senders waiting, buffer full if present 435 chanStateBuf // buffer not empty, no senders waiting 436 chanStateClosed // channel closed 437) 438 439func (s chanState) String() string { 440 switch s { 441 case chanStateEmpty: 442 return "empty" 443 case chanStateRecv: 444 return "recv" 445 case chanStateSend: 446 return "send" 447 case chanStateBuf: 448 return "buffered" 449 case chanStateClosed: 450 return "closed" 451 default: 452 return "invalid" 453 } 454} 455 456// chanSelectState is a single channel operation (send/recv) in a select 457// statement. The value pointer is either nil (for receives) or points to the 458// value to send (for sends). 459type chanSelectState struct { 460 ch *channel 461 value unsafe.Pointer 462} 463 464// chanSend sends a single value over the channel. 465// This operation will block unless a value is immediately available. 466// May panic if the channel is closed. 467func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) { 468 i := interrupt.Disable() 469 470 if ch.trySend(value) { 471 // value immediately sent 472 chanDebug(ch) 473 interrupt.Restore(i) 474 return 475 } 476 477 if ch == nil { 478 // A nil channel blocks forever. Do not schedule this goroutine again. 479 interrupt.Restore(i) 480 deadlock() 481 } 482 483 // wait for reciever 484 sender := task.Current() 485 ch.state = chanStateSend 486 sender.Ptr = value 487 *blockedlist = channelBlockedList{ 488 next: ch.blocked, 489 t: sender, 490 } 491 ch.blocked = blockedlist 492 chanDebug(ch) 493 interrupt.Restore(i) 494 task.Pause() 495 sender.Ptr = nil 496} 497 498// chanRecv receives a single value over a channel. 499// It blocks if there is no available value to recieve. 500// The recieved value is copied into the value pointer. 501// Returns the comma-ok value. 502func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool { 503 i := interrupt.Disable() 504 505 if rx, ok := ch.tryRecv(value); rx { 506 // value immediately available 507 chanDebug(ch) 508 interrupt.Restore(i) 509 return ok 510 } 511 512 if ch == nil { 513 // A nil channel blocks forever. Do not schedule this goroutine again. 514 interrupt.Restore(i) 515 deadlock() 516 } 517 518 // wait for a value 519 receiver := task.Current() 520 ch.state = chanStateRecv 521 receiver.Ptr, receiver.Data = value, 1 522 *blockedlist = channelBlockedList{ 523 next: ch.blocked, 524 t: receiver, 525 } 526 ch.blocked = blockedlist 527 chanDebug(ch) 528 interrupt.Restore(i) 529 task.Pause() 530 ok := receiver.Data == 1 531 receiver.Ptr, receiver.Data = nil, 0 532 return ok 533} 534 535// chanClose closes the given channel. If this channel has a receiver or is 536// empty, it closes the channel. Else, it panics. 537func chanClose(ch *channel) { 538 if ch == nil { 539 // Not allowed by the language spec. 540 runtimePanic("close of nil channel") 541 } 542 i := interrupt.Disable() 543 switch ch.state { 544 case chanStateClosed: 545 // Not allowed by the language spec. 546 interrupt.Restore(i) 547 runtimePanic("close of closed channel") 548 case chanStateSend: 549 // This panic should ideally on the sending side, not in this goroutine. 550 // But when a goroutine tries to send while the channel is being closed, 551 // that is clearly invalid: the send should have been completed already 552 // before the close. 553 interrupt.Restore(i) 554 runtimePanic("close channel during send") 555 case chanStateRecv: 556 // unblock all receivers with the zero value 557 ch.state = chanStateClosed 558 for ch.blocked != nil { 559 ch.resumeRX(false) 560 } 561 case chanStateEmpty, chanStateBuf: 562 // Easy case. No available sender or receiver. 563 } 564 ch.state = chanStateClosed 565 interrupt.Restore(i) 566 chanDebug(ch) 567} 568 569// chanSelect is the runtime implementation of the select statement. This is 570// perhaps the most complicated statement in the Go spec. It returns the 571// selected index and the 'comma-ok' value. 572// 573// TODO: do this in a round-robin fashion (as specified in the Go spec) instead 574// of picking the first one that can proceed. 575func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { 576 istate := interrupt.Disable() 577 578 if selected, ok := tryChanSelect(recvbuf, states); selected != ^uintptr(0) { 579 // one channel was immediately ready 580 interrupt.Restore(istate) 581 return selected, ok 582 } 583 584 // construct blocked operations 585 for i, v := range states { 586 if v.ch == nil { 587 // A nil channel receive will never complete. 588 // A nil channel send would have panicked during tryChanSelect. 589 ops[i] = channelBlockedList{} 590 continue 591 } 592 593 ops[i] = channelBlockedList{ 594 next: v.ch.blocked, 595 t: task.Current(), 596 s: &states[i], 597 allSelectOps: ops, 598 } 599 v.ch.blocked = &ops[i] 600 if v.value == nil { 601 // recv 602 switch v.ch.state { 603 case chanStateEmpty: 604 v.ch.state = chanStateRecv 605 case chanStateRecv: 606 // already in correct state 607 default: 608 interrupt.Restore(istate) 609 runtimePanic("invalid channel state") 610 } 611 } else { 612 // send 613 switch v.ch.state { 614 case chanStateEmpty: 615 v.ch.state = chanStateSend 616 case chanStateSend: 617 // already in correct state 618 case chanStateBuf: 619 // already in correct state 620 default: 621 interrupt.Restore(istate) 622 runtimePanic("invalid channel state") 623 } 624 } 625 chanDebug(v.ch) 626 } 627 628 // expose rx buffer 629 t := task.Current() 630 t.Ptr = recvbuf 631 t.Data = 1 632 633 // wait for one case to fire 634 interrupt.Restore(istate) 635 task.Pause() 636 637 // figure out which one fired and return the ok value 638 return (uintptr(t.Ptr) - uintptr(unsafe.Pointer(&states[0]))) / unsafe.Sizeof(chanSelectState{}), t.Data != 0 639} 640 641// tryChanSelect is like chanSelect, but it does a non-blocking select operation. 642func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) { 643 istate := interrupt.Disable() 644 645 // See whether we can receive from one of the channels. 646 for i, state := range states { 647 if state.value == nil { 648 // A receive operation. 649 if rx, ok := state.ch.tryRecv(recvbuf); rx { 650 chanDebug(state.ch) 651 interrupt.Restore(istate) 652 return uintptr(i), ok 653 } 654 } else { 655 // A send operation: state.value is not nil. 656 if state.ch.trySend(state.value) { 657 chanDebug(state.ch) 658 interrupt.Restore(istate) 659 return uintptr(i), true 660 } 661 } 662 } 663 664 interrupt.Restore(istate) 665 return ^uintptr(0), false 666} 667