1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package runtime 6 7// This file contains the implementation of Go channels. 8 9// Invariants: 10// At least one of c.sendq and c.recvq is empty, 11// except for the case of an unbuffered channel with a single goroutine 12// blocked on it for both sending and receiving using a select statement, 13// in which case the length of c.sendq and c.recvq is limited only by the 14// size of the select statement. 15// 16// For buffered channels, also: 17// c.qcount > 0 implies that c.recvq is empty. 18// c.qcount < c.dataqsiz implies that c.sendq is empty. 19 20import ( 21 "runtime/internal/atomic" 22 "runtime/internal/math" 23 "unsafe" 24) 25 26const ( 27 maxAlign = 8 28 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 29 debugChan = false 30) 31 32type hchan struct { 33 qcount uint // total data in the queue 34 dataqsiz uint // size of the circular queue 35 buf unsafe.Pointer // points to an array of dataqsiz elements 36 elemsize uint16 37 closed uint32 38 elemtype *_type // element type 39 sendx uint // send index 40 recvx uint // receive index 41 recvq waitq // list of recv waiters 42 sendq waitq // list of send waiters 43 44 // lock protects all fields in hchan, as well as several 45 // fields in sudogs blocked on this channel. 46 // 47 // Do not change another G's status while holding this lock 48 // (in particular, do not ready a G), as this can deadlock 49 // with stack shrinking. 50 lock mutex 51} 52 53type waitq struct { 54 first *sudog 55 last *sudog 56} 57 58//go:linkname reflect_makechan reflect.makechan 59func reflect_makechan(t *chantype, size int) *hchan { 60 return makechan(t, size) 61} 62 63func makechan64(t *chantype, size int64) *hchan { 64 if int64(int(size)) != size { 65 panic(plainError("makechan: size out of range")) 66 } 67 68 return makechan(t, int(size)) 69} 70 71func makechan(t *chantype, size int) *hchan { 72 elem := t.elem 73 74 // compiler checks this but be safe. 75 if elem.size >= 1<<16 { 76 throw("makechan: invalid channel element type") 77 } 78 if hchanSize%maxAlign != 0 || elem.align > maxAlign { 79 throw("makechan: bad alignment") 80 } 81 82 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) 83 if overflow || mem > maxAlloc-hchanSize || size < 0 { 84 panic(plainError("makechan: size out of range")) 85 } 86 87 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. 88 // buf points into the same allocation, elemtype is persistent. 89 // SudoG's are referenced from their owning thread so they can't be collected. 90 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. 91 var c *hchan 92 switch { 93 case mem == 0: 94 // Queue or element size is zero. 95 c = (*hchan)(mallocgc(hchanSize, nil, true)) 96 // Race detector uses this location for synchronization. 97 c.buf = c.raceaddr() 98 case elem.ptrdata == 0: 99 // Elements do not contain pointers. 100 // Allocate hchan and buf in one call. 101 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) 102 c.buf = add(unsafe.Pointer(c), hchanSize) 103 default: 104 // Elements contain pointers. 105 c = new(hchan) 106 c.buf = mallocgc(mem, elem, true) 107 } 108 109 c.elemsize = uint16(elem.size) 110 c.elemtype = elem 111 c.dataqsiz = uint(size) 112 113 if debugChan { 114 print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") 115 } 116 return c 117} 118 119// chanbuf(c, i) is pointer to the i'th slot in the buffer. 120func chanbuf(c *hchan, i uint) unsafe.Pointer { 121 return add(c.buf, uintptr(i)*uintptr(c.elemsize)) 122} 123 124// entry point for c <- x from compiled code 125//go:nosplit 126func chansend1(c *hchan, elem unsafe.Pointer) { 127 chansend(c, elem, true, getcallerpc()) 128} 129 130/* 131 * generic single channel send/recv 132 * If block is not nil, 133 * then the protocol will not 134 * sleep but return if it could 135 * not complete. 136 * 137 * sleep can wake up with g.param == nil 138 * when a channel involved in the sleep has 139 * been closed. it is easiest to loop and re-run 140 * the operation; we'll see that it's now closed. 141 */ 142func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 143 if c == nil { 144 if !block { 145 return false 146 } 147 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) 148 throw("unreachable") 149 } 150 151 if debugChan { 152 print("chansend: chan=", c, "\n") 153 } 154 155 if raceenabled { 156 racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) 157 } 158 159 // Fast path: check for failed non-blocking operation without acquiring the lock. 160 // 161 // After observing that the channel is not closed, we observe that the channel is 162 // not ready for sending. Each of these observations is a single word-sized read 163 // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). 164 // Because a closed channel cannot transition from 'ready for sending' to 165 // 'not ready for sending', even if the channel is closed between the two observations, 166 // they imply a moment between the two when the channel was both not yet closed 167 // and not ready for sending. We behave as if we observed the channel at that moment, 168 // and report that the send cannot proceed. 169 // 170 // It is okay if the reads are reordered here: if we observe that the channel is not 171 // ready for sending and then observe that it is not closed, that implies that the 172 // channel wasn't closed during the first observation. 173 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || 174 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { 175 return false 176 } 177 178 var t0 int64 179 if blockprofilerate > 0 { 180 t0 = cputicks() 181 } 182 183 lock(&c.lock) 184 185 if c.closed != 0 { 186 unlock(&c.lock) 187 panic(plainError("send on closed channel")) 188 } 189 190 if sg := c.recvq.dequeue(); sg != nil { 191 // Found a waiting receiver. We pass the value we want to send 192 // directly to the receiver, bypassing the channel buffer (if any). 193 send(c, sg, ep, func() { unlock(&c.lock) }, 3) 194 return true 195 } 196 197 if c.qcount < c.dataqsiz { 198 // Space is available in the channel buffer. Enqueue the element to send. 199 qp := chanbuf(c, c.sendx) 200 if raceenabled { 201 raceacquire(qp) 202 racerelease(qp) 203 } 204 typedmemmove(c.elemtype, qp, ep) 205 c.sendx++ 206 if c.sendx == c.dataqsiz { 207 c.sendx = 0 208 } 209 c.qcount++ 210 unlock(&c.lock) 211 return true 212 } 213 214 if !block { 215 unlock(&c.lock) 216 return false 217 } 218 219 // Block on the channel. Some receiver will complete our operation for us. 220 gp := getg() 221 mysg := acquireSudog() 222 mysg.releasetime = 0 223 if t0 != 0 { 224 mysg.releasetime = -1 225 } 226 // No stack splits between assigning elem and enqueuing mysg 227 // on gp.waiting where copystack can find it. 228 mysg.elem = ep 229 mysg.waitlink = nil 230 mysg.g = gp 231 mysg.isSelect = false 232 mysg.c = c 233 gp.waiting = mysg 234 gp.param = nil 235 c.sendq.enqueue(mysg) 236 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) 237 // Ensure the value being sent is kept alive until the 238 // receiver copies it out. The sudog has a pointer to the 239 // stack object, but sudogs aren't considered as roots of the 240 // stack tracer. 241 KeepAlive(ep) 242 243 // someone woke us up. 244 if mysg != gp.waiting { 245 throw("G waiting list is corrupted") 246 } 247 gp.waiting = nil 248 gp.activeStackChans = false 249 if gp.param == nil { 250 if c.closed == 0 { 251 throw("chansend: spurious wakeup") 252 } 253 panic(plainError("send on closed channel")) 254 } 255 gp.param = nil 256 if mysg.releasetime > 0 { 257 blockevent(mysg.releasetime-t0, 2) 258 } 259 mysg.c = nil 260 releaseSudog(mysg) 261 return true 262} 263 264// send processes a send operation on an empty channel c. 265// The value ep sent by the sender is copied to the receiver sg. 266// The receiver is then woken up to go on its merry way. 267// Channel c must be empty and locked. send unlocks c with unlockf. 268// sg must already be dequeued from c. 269// ep must be non-nil and point to the heap or the caller's stack. 270func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 271 if raceenabled { 272 if c.dataqsiz == 0 { 273 racesync(c, sg) 274 } else { 275 // Pretend we go through the buffer, even though 276 // we copy directly. Note that we need to increment 277 // the head/tail locations only when raceenabled. 278 qp := chanbuf(c, c.recvx) 279 raceacquire(qp) 280 racerelease(qp) 281 raceacquireg(sg.g, qp) 282 racereleaseg(sg.g, qp) 283 c.recvx++ 284 if c.recvx == c.dataqsiz { 285 c.recvx = 0 286 } 287 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 288 } 289 } 290 if sg.elem != nil { 291 sendDirect(c.elemtype, sg, ep) 292 sg.elem = nil 293 } 294 gp := sg.g 295 unlockf() 296 gp.param = unsafe.Pointer(sg) 297 if sg.releasetime != 0 { 298 sg.releasetime = cputicks() 299 } 300 goready(gp, skip+1) 301} 302 303// Sends and receives on unbuffered or empty-buffered channels are the 304// only operations where one running goroutine writes to the stack of 305// another running goroutine. The GC assumes that stack writes only 306// happen when the goroutine is running and are only done by that 307// goroutine. Using a write barrier is sufficient to make up for 308// violating that assumption, but the write barrier has to work. 309// typedmemmove will call bulkBarrierPreWrite, but the target bytes 310// are not in the heap, so that will not help. We arrange to call 311// memmove and typeBitsBulkBarrier instead. 312 313func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { 314 // src is on our stack, dst is a slot on another stack. 315 316 // Once we read sg.elem out of sg, it will no longer 317 // be updated if the destination's stack gets copied (shrunk). 318 // So make sure that no preemption points can happen between read & use. 319 dst := sg.elem 320 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) 321 // No need for cgo write barrier checks because dst is always 322 // Go memory. 323 memmove(dst, src, t.size) 324} 325 326func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { 327 // dst is on our stack or the heap, src is on another stack. 328 // The channel is locked, so src will not move during this 329 // operation. 330 src := sg.elem 331 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) 332 memmove(dst, src, t.size) 333} 334 335func closechan(c *hchan) { 336 if c == nil { 337 panic(plainError("close of nil channel")) 338 } 339 340 lock(&c.lock) 341 if c.closed != 0 { 342 unlock(&c.lock) 343 panic(plainError("close of closed channel")) 344 } 345 346 if raceenabled { 347 callerpc := getcallerpc() 348 racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) 349 racerelease(c.raceaddr()) 350 } 351 352 c.closed = 1 353 354 var glist gList 355 356 // release all readers 357 for { 358 sg := c.recvq.dequeue() 359 if sg == nil { 360 break 361 } 362 if sg.elem != nil { 363 typedmemclr(c.elemtype, sg.elem) 364 sg.elem = nil 365 } 366 if sg.releasetime != 0 { 367 sg.releasetime = cputicks() 368 } 369 gp := sg.g 370 gp.param = nil 371 if raceenabled { 372 raceacquireg(gp, c.raceaddr()) 373 } 374 glist.push(gp) 375 } 376 377 // release all writers (they will panic) 378 for { 379 sg := c.sendq.dequeue() 380 if sg == nil { 381 break 382 } 383 sg.elem = nil 384 if sg.releasetime != 0 { 385 sg.releasetime = cputicks() 386 } 387 gp := sg.g 388 gp.param = nil 389 if raceenabled { 390 raceacquireg(gp, c.raceaddr()) 391 } 392 glist.push(gp) 393 } 394 unlock(&c.lock) 395 396 // Ready all Gs now that we've dropped the channel lock. 397 for !glist.empty() { 398 gp := glist.pop() 399 gp.schedlink = 0 400 goready(gp, 3) 401 } 402} 403 404// entry points for <- c from compiled code 405//go:nosplit 406func chanrecv1(c *hchan, elem unsafe.Pointer) { 407 chanrecv(c, elem, true) 408} 409 410//go:nosplit 411func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { 412 _, received = chanrecv(c, elem, true) 413 return 414} 415 416// chanrecv receives on channel c and writes the received data to ep. 417// ep may be nil, in which case received data is ignored. 418// If block == false and no elements are available, returns (false, false). 419// Otherwise, if c is closed, zeros *ep and returns (true, false). 420// Otherwise, fills in *ep with an element and returns (true, true). 421// A non-nil ep must point to the heap or the caller's stack. 422func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { 423 // raceenabled: don't need to check ep, as it is always on the stack 424 // or is new memory allocated by reflect. 425 426 if debugChan { 427 print("chanrecv: chan=", c, "\n") 428 } 429 430 if c == nil { 431 if !block { 432 return 433 } 434 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) 435 throw("unreachable") 436 } 437 438 // Fast path: check for failed non-blocking operation without acquiring the lock. 439 // 440 // After observing that the channel is not ready for receiving, we observe that the 441 // channel is not closed. Each of these observations is a single word-sized read 442 // (first c.sendq.first or c.qcount, and second c.closed). 443 // Because a channel cannot be reopened, the later observation of the channel 444 // being not closed implies that it was also not closed at the moment of the 445 // first observation. We behave as if we observed the channel at that moment 446 // and report that the receive cannot proceed. 447 // 448 // The order of operations is important here: reversing the operations can lead to 449 // incorrect behavior when racing with a close. 450 if !block && (c.dataqsiz == 0 && c.sendq.first == nil || 451 c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && 452 atomic.Load(&c.closed) == 0 { 453 return 454 } 455 456 var t0 int64 457 if blockprofilerate > 0 { 458 t0 = cputicks() 459 } 460 461 lock(&c.lock) 462 463 if c.closed != 0 && c.qcount == 0 { 464 if raceenabled { 465 raceacquire(c.raceaddr()) 466 } 467 unlock(&c.lock) 468 if ep != nil { 469 typedmemclr(c.elemtype, ep) 470 } 471 return true, false 472 } 473 474 if sg := c.sendq.dequeue(); sg != nil { 475 // Found a waiting sender. If buffer is size 0, receive value 476 // directly from sender. Otherwise, receive from head of queue 477 // and add sender's value to the tail of the queue (both map to 478 // the same buffer slot because the queue is full). 479 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) 480 return true, true 481 } 482 483 if c.qcount > 0 { 484 // Receive directly from queue 485 qp := chanbuf(c, c.recvx) 486 if raceenabled { 487 raceacquire(qp) 488 racerelease(qp) 489 } 490 if ep != nil { 491 typedmemmove(c.elemtype, ep, qp) 492 } 493 typedmemclr(c.elemtype, qp) 494 c.recvx++ 495 if c.recvx == c.dataqsiz { 496 c.recvx = 0 497 } 498 c.qcount-- 499 unlock(&c.lock) 500 return true, true 501 } 502 503 if !block { 504 unlock(&c.lock) 505 return false, false 506 } 507 508 // no sender available: block on this channel. 509 gp := getg() 510 mysg := acquireSudog() 511 mysg.releasetime = 0 512 if t0 != 0 { 513 mysg.releasetime = -1 514 } 515 // No stack splits between assigning elem and enqueuing mysg 516 // on gp.waiting where copystack can find it. 517 mysg.elem = ep 518 mysg.waitlink = nil 519 gp.waiting = mysg 520 mysg.g = gp 521 mysg.isSelect = false 522 mysg.c = c 523 gp.param = nil 524 c.recvq.enqueue(mysg) 525 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) 526 527 // someone woke us up 528 if mysg != gp.waiting { 529 throw("G waiting list is corrupted") 530 } 531 gp.waiting = nil 532 gp.activeStackChans = false 533 if mysg.releasetime > 0 { 534 blockevent(mysg.releasetime-t0, 2) 535 } 536 closed := gp.param == nil 537 gp.param = nil 538 mysg.c = nil 539 releaseSudog(mysg) 540 return true, !closed 541} 542 543// recv processes a receive operation on a full channel c. 544// There are 2 parts: 545// 1) The value sent by the sender sg is put into the channel 546// and the sender is woken up to go on its merry way. 547// 2) The value received by the receiver (the current G) is 548// written to ep. 549// For synchronous channels, both values are the same. 550// For asynchronous channels, the receiver gets its data from 551// the channel buffer and the sender's data is put in the 552// channel buffer. 553// Channel c must be full and locked. recv unlocks c with unlockf. 554// sg must already be dequeued from c. 555// A non-nil ep must point to the heap or the caller's stack. 556func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 557 if c.dataqsiz == 0 { 558 if raceenabled { 559 racesync(c, sg) 560 } 561 if ep != nil { 562 // copy data from sender 563 recvDirect(c.elemtype, sg, ep) 564 } 565 } else { 566 // Queue is full. Take the item at the 567 // head of the queue. Make the sender enqueue 568 // its item at the tail of the queue. Since the 569 // queue is full, those are both the same slot. 570 qp := chanbuf(c, c.recvx) 571 if raceenabled { 572 raceacquire(qp) 573 racerelease(qp) 574 raceacquireg(sg.g, qp) 575 racereleaseg(sg.g, qp) 576 } 577 // copy data from queue to receiver 578 if ep != nil { 579 typedmemmove(c.elemtype, ep, qp) 580 } 581 // copy data from sender to queue 582 typedmemmove(c.elemtype, qp, sg.elem) 583 c.recvx++ 584 if c.recvx == c.dataqsiz { 585 c.recvx = 0 586 } 587 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 588 } 589 sg.elem = nil 590 gp := sg.g 591 unlockf() 592 gp.param = unsafe.Pointer(sg) 593 if sg.releasetime != 0 { 594 sg.releasetime = cputicks() 595 } 596 goready(gp, skip+1) 597} 598 599func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool { 600 // There are unlocked sudogs that point into gp's stack. Stack 601 // copying must lock the channels of those sudogs. 602 gp.activeStackChans = true 603 unlock((*mutex)(chanLock)) 604 return true 605} 606 607// compiler implements 608// 609// select { 610// case c <- v: 611// ... foo 612// default: 613// ... bar 614// } 615// 616// as 617// 618// if selectnbsend(c, v) { 619// ... foo 620// } else { 621// ... bar 622// } 623// 624func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { 625 return chansend(c, elem, false, getcallerpc()) 626} 627 628// compiler implements 629// 630// select { 631// case v = <-c: 632// ... foo 633// default: 634// ... bar 635// } 636// 637// as 638// 639// if selectnbrecv(&v, c) { 640// ... foo 641// } else { 642// ... bar 643// } 644// 645func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { 646 selected, _ = chanrecv(c, elem, false) 647 return 648} 649 650// compiler implements 651// 652// select { 653// case v, ok = <-c: 654// ... foo 655// default: 656// ... bar 657// } 658// 659// as 660// 661// if c != nil && selectnbrecv2(&v, &ok, c) { 662// ... foo 663// } else { 664// ... bar 665// } 666// 667func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { 668 // TODO(khr): just return 2 values from this function, now that it is in Go. 669 selected, *received = chanrecv(c, elem, false) 670 return 671} 672 673//go:linkname reflect_chansend reflect.chansend 674func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) { 675 return chansend(c, elem, !nb, getcallerpc()) 676} 677 678//go:linkname reflect_chanrecv reflect.chanrecv 679func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) { 680 return chanrecv(c, elem, !nb) 681} 682 683//go:linkname reflect_chanlen reflect.chanlen 684func reflect_chanlen(c *hchan) int { 685 if c == nil { 686 return 0 687 } 688 return int(c.qcount) 689} 690 691//go:linkname reflectlite_chanlen internal/reflectlite.chanlen 692func reflectlite_chanlen(c *hchan) int { 693 if c == nil { 694 return 0 695 } 696 return int(c.qcount) 697} 698 699//go:linkname reflect_chancap reflect.chancap 700func reflect_chancap(c *hchan) int { 701 if c == nil { 702 return 0 703 } 704 return int(c.dataqsiz) 705} 706 707//go:linkname reflect_chanclose reflect.chanclose 708func reflect_chanclose(c *hchan) { 709 closechan(c) 710} 711 712func (q *waitq) enqueue(sgp *sudog) { 713 sgp.next = nil 714 x := q.last 715 if x == nil { 716 sgp.prev = nil 717 q.first = sgp 718 q.last = sgp 719 return 720 } 721 sgp.prev = x 722 x.next = sgp 723 q.last = sgp 724} 725 726func (q *waitq) dequeue() *sudog { 727 for { 728 sgp := q.first 729 if sgp == nil { 730 return nil 731 } 732 y := sgp.next 733 if y == nil { 734 q.first = nil 735 q.last = nil 736 } else { 737 y.prev = nil 738 q.first = y 739 sgp.next = nil // mark as removed (see dequeueSudog) 740 } 741 742 // if a goroutine was put on this queue because of a 743 // select, there is a small window between the goroutine 744 // being woken up by a different case and it grabbing the 745 // channel locks. Once it has the lock 746 // it removes itself from the queue, so we won't see it after that. 747 // We use a flag in the G struct to tell us when someone 748 // else has won the race to signal this goroutine but the goroutine 749 // hasn't removed itself from the queue yet. 750 if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) { 751 continue 752 } 753 754 return sgp 755 } 756} 757 758func (c *hchan) raceaddr() unsafe.Pointer { 759 // Treat read-like and write-like operations on the channel to 760 // happen at this address. Avoid using the address of qcount 761 // or dataqsiz, because the len() and cap() builtins read 762 // those addresses, and we don't want them racing with 763 // operations like close(). 764 return unsafe.Pointer(&c.buf) 765} 766 767func racesync(c *hchan, sg *sudog) { 768 racerelease(chanbuf(c, 0)) 769 raceacquireg(sg.g, chanbuf(c, 0)) 770 racereleaseg(sg.g, chanbuf(c, 0)) 771 raceacquire(chanbuf(c, 0)) 772} 773