1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package runtime 6 7// This file contains the implementation of Go channels. 8 9// Invariants: 10// At least one of c.sendq and c.recvq is empty, 11// except for the case of an unbuffered channel with a single goroutine 12// blocked on it for both sending and receiving using a select statement, 13// in which case the length of c.sendq and c.recvq is limited only by the 14// size of the select statement. 15// 16// For buffered channels, also: 17// c.qcount > 0 implies that c.recvq is empty. 18// c.qcount < c.dataqsiz implies that c.sendq is empty. 19 20import ( 21 "runtime/internal/atomic" 22 "runtime/internal/math" 23 "unsafe" 24) 25 26// For gccgo, use go:linkname to export compiler-called functions. 27// 28//go:linkname makechan 29//go:linkname makechan64 30//go:linkname chansend1 31//go:linkname chanrecv1 32//go:linkname chanrecv2 33//go:linkname closechan 34//go:linkname selectnbsend 35//go:linkname selectnbrecv 36//go:linkname selectnbrecv2 37 38const ( 39 maxAlign = 8 40 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 41 debugChan = false 42) 43 44type hchan struct { 45 qcount uint // total data in the queue 46 dataqsiz uint // size of the circular queue 47 buf unsafe.Pointer // points to an array of dataqsiz elements 48 elemsize uint16 49 closed uint32 50 elemtype *_type // element type 51 sendx uint // send index 52 recvx uint // receive index 53 recvq waitq // list of recv waiters 54 sendq waitq // list of send waiters 55 56 // lock protects all fields in hchan, as well as several 57 // fields in sudogs blocked on this channel. 58 // 59 // Do not change another G's status while holding this lock 60 // (in particular, do not ready a G), as this can deadlock 61 // with stack shrinking. 62 lock mutex 63} 64 65type waitq struct { 66 first *sudog 67 last *sudog 68} 69 70//go:linkname reflect_makechan reflect.makechan 71func reflect_makechan(t *chantype, size int) *hchan { 72 return makechan(t, size) 73} 74 75func makechan64(t *chantype, size int64) *hchan { 76 if int64(int(size)) != size { 77 panic(plainError("makechan: size out of range")) 78 } 79 80 return makechan(t, int(size)) 81} 82 83func makechan(t *chantype, size int) *hchan { 84 elem := t.elem 85 86 // compiler checks this but be safe. 87 if elem.size >= 1<<16 { 88 throw("makechan: invalid channel element type") 89 } 90 if hchanSize%maxAlign != 0 || elem.align > maxAlign { 91 throw("makechan: bad alignment") 92 } 93 94 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) 95 if overflow || mem > maxAlloc-hchanSize || size < 0 { 96 panic(plainError("makechan: size out of range")) 97 } 98 99 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. 100 // buf points into the same allocation, elemtype is persistent. 101 // SudoG's are referenced from their owning thread so they can't be collected. 102 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. 103 var c *hchan 104 switch { 105 case mem == 0: 106 // Queue or element size is zero. 107 c = (*hchan)(mallocgc(hchanSize, nil, true)) 108 // Race detector uses this location for synchronization. 109 c.buf = c.raceaddr() 110 case elem.ptrdata == 0: 111 // Elements do not contain pointers. 112 // Allocate hchan and buf in one call. 113 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) 114 c.buf = add(unsafe.Pointer(c), hchanSize) 115 default: 116 // Elements contain pointers. 117 c = new(hchan) 118 c.buf = mallocgc(mem, elem, true) 119 } 120 121 c.elemsize = uint16(elem.size) 122 c.elemtype = elem 123 c.dataqsiz = uint(size) 124 lockInit(&c.lock, lockRankHchan) 125 126 if debugChan { 127 print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") 128 } 129 return c 130} 131 132// chanbuf(c, i) is pointer to the i'th slot in the buffer. 133func chanbuf(c *hchan, i uint) unsafe.Pointer { 134 return add(c.buf, uintptr(i)*uintptr(c.elemsize)) 135} 136 137// full reports whether a send on c would block (that is, the channel is full). 138// It uses a single word-sized read of mutable state, so although 139// the answer is instantaneously true, the correct answer may have changed 140// by the time the calling function receives the return value. 141func full(c *hchan) bool { 142 // c.dataqsiz is immutable (never written after the channel is created) 143 // so it is safe to read at any time during channel operation. 144 if c.dataqsiz == 0 { 145 // Assumes that a pointer read is relaxed-atomic. 146 return c.recvq.first == nil 147 } 148 // Assumes that a uint read is relaxed-atomic. 149 return c.qcount == c.dataqsiz 150} 151 152// entry point for c <- x from compiled code 153//go:nosplit 154func chansend1(c *hchan, elem unsafe.Pointer) { 155 chansend(c, elem, true, getcallerpc()) 156} 157 158/* 159 * generic single channel send/recv 160 * If block is not nil, 161 * then the protocol will not 162 * sleep but return if it could 163 * not complete. 164 * 165 * sleep can wake up with g.param == nil 166 * when a channel involved in the sleep has 167 * been closed. it is easiest to loop and re-run 168 * the operation; we'll see that it's now closed. 169 */ 170func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 171 // Check preemption, since unlike gc we don't check on every call. 172 if getg().preempt { 173 checkPreempt() 174 } 175 176 if c == nil { 177 if !block { 178 return false 179 } 180 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) 181 throw("unreachable") 182 } 183 184 if debugChan { 185 print("chansend: chan=", c, "\n") 186 } 187 188 if raceenabled { 189 racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) 190 } 191 192 // Fast path: check for failed non-blocking operation without acquiring the lock. 193 // 194 // After observing that the channel is not closed, we observe that the channel is 195 // not ready for sending. Each of these observations is a single word-sized read 196 // (first c.closed and second full()). 197 // Because a closed channel cannot transition from 'ready for sending' to 198 // 'not ready for sending', even if the channel is closed between the two observations, 199 // they imply a moment between the two when the channel was both not yet closed 200 // and not ready for sending. We behave as if we observed the channel at that moment, 201 // and report that the send cannot proceed. 202 // 203 // It is okay if the reads are reordered here: if we observe that the channel is not 204 // ready for sending and then observe that it is not closed, that implies that the 205 // channel wasn't closed during the first observation. However, nothing here 206 // guarantees forward progress. We rely on the side effects of lock release in 207 // chanrecv() and closechan() to update this thread's view of c.closed and full(). 208 if !block && c.closed == 0 && full(c) { 209 return false 210 } 211 212 var t0 int64 213 if blockprofilerate > 0 { 214 t0 = cputicks() 215 } 216 217 lock(&c.lock) 218 219 if c.closed != 0 { 220 unlock(&c.lock) 221 panic(plainError("send on closed channel")) 222 } 223 224 if sg := c.recvq.dequeue(); sg != nil { 225 // Found a waiting receiver. We pass the value we want to send 226 // directly to the receiver, bypassing the channel buffer (if any). 227 send(c, sg, ep, func() { unlock(&c.lock) }, 3) 228 return true 229 } 230 231 if c.qcount < c.dataqsiz { 232 // Space is available in the channel buffer. Enqueue the element to send. 233 qp := chanbuf(c, c.sendx) 234 if raceenabled { 235 racenotify(c, c.sendx, nil) 236 } 237 typedmemmove(c.elemtype, qp, ep) 238 c.sendx++ 239 if c.sendx == c.dataqsiz { 240 c.sendx = 0 241 } 242 c.qcount++ 243 unlock(&c.lock) 244 return true 245 } 246 247 if !block { 248 unlock(&c.lock) 249 return false 250 } 251 252 // Block on the channel. Some receiver will complete our operation for us. 253 gp := getg() 254 mysg := acquireSudog() 255 mysg.releasetime = 0 256 if t0 != 0 { 257 mysg.releasetime = -1 258 } 259 // No stack splits between assigning elem and enqueuing mysg 260 // on gp.waiting where copystack can find it. 261 mysg.elem = ep 262 mysg.waitlink = nil 263 mysg.g = gp 264 mysg.isSelect = false 265 mysg.c = c 266 gp.waiting = mysg 267 gp.param = nil 268 c.sendq.enqueue(mysg) 269 // Signal to anyone trying to shrink our stack that we're about 270 // to park on a channel. The window between when this G's status 271 // changes and when we set gp.activeStackChans is not safe for 272 // stack shrinking. 273 atomic.Store8(&gp.parkingOnChan, 1) 274 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) 275 // Ensure the value being sent is kept alive until the 276 // receiver copies it out. The sudog has a pointer to the 277 // stack object, but sudogs aren't considered as roots of the 278 // stack tracer. 279 KeepAlive(ep) 280 281 // someone woke us up. 282 if mysg != gp.waiting { 283 throw("G waiting list is corrupted") 284 } 285 gp.waiting = nil 286 gp.activeStackChans = false 287 closed := !mysg.success 288 gp.param = nil 289 if mysg.releasetime > 0 { 290 blockevent(mysg.releasetime-t0, 2) 291 } 292 mysg.c = nil 293 releaseSudog(mysg) 294 if closed { 295 if c.closed == 0 { 296 throw("chansend: spurious wakeup") 297 } 298 panic(plainError("send on closed channel")) 299 } 300 return true 301} 302 303// send processes a send operation on an empty channel c. 304// The value ep sent by the sender is copied to the receiver sg. 305// The receiver is then woken up to go on its merry way. 306// Channel c must be empty and locked. send unlocks c with unlockf. 307// sg must already be dequeued from c. 308// ep must be non-nil and point to the heap or the caller's stack. 309func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 310 if raceenabled { 311 if c.dataqsiz == 0 { 312 racesync(c, sg) 313 } else { 314 // Pretend we go through the buffer, even though 315 // we copy directly. Note that we need to increment 316 // the head/tail locations only when raceenabled. 317 racenotify(c, c.recvx, nil) 318 racenotify(c, c.recvx, sg) 319 c.recvx++ 320 if c.recvx == c.dataqsiz { 321 c.recvx = 0 322 } 323 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 324 } 325 } 326 if sg.elem != nil { 327 sendDirect(c.elemtype, sg, ep) 328 sg.elem = nil 329 } 330 gp := sg.g 331 unlockf() 332 gp.param = unsafe.Pointer(sg) 333 sg.success = true 334 if sg.releasetime != 0 { 335 sg.releasetime = cputicks() 336 } 337 goready(gp, skip+1) 338} 339 340// Sends and receives on unbuffered or empty-buffered channels are the 341// only operations where one running goroutine writes to the stack of 342// another running goroutine. The GC assumes that stack writes only 343// happen when the goroutine is running and are only done by that 344// goroutine. Using a write barrier is sufficient to make up for 345// violating that assumption, but the write barrier has to work. 346// typedmemmove will call bulkBarrierPreWrite, but the target bytes 347// are not in the heap, so that will not help. We arrange to call 348// memmove and typeBitsBulkBarrier instead. 349 350func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { 351 // src is on our stack, dst is a slot on another stack. 352 353 // Once we read sg.elem out of sg, it will no longer 354 // be updated if the destination's stack gets copied (shrunk). 355 // So make sure that no preemption points can happen between read & use. 356 dst := sg.elem 357 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) 358 // No need for cgo write barrier checks because dst is always 359 // Go memory. 360 memmove(dst, src, t.size) 361} 362 363func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { 364 // dst is on our stack or the heap, src is on another stack. 365 // The channel is locked, so src will not move during this 366 // operation. 367 src := sg.elem 368 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) 369 memmove(dst, src, t.size) 370} 371 372func closechan(c *hchan) { 373 if c == nil { 374 panic(plainError("close of nil channel")) 375 } 376 377 lock(&c.lock) 378 if c.closed != 0 { 379 unlock(&c.lock) 380 panic(plainError("close of closed channel")) 381 } 382 383 if raceenabled { 384 callerpc := getcallerpc() 385 racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) 386 racerelease(c.raceaddr()) 387 } 388 389 c.closed = 1 390 391 var glist gList 392 393 // release all readers 394 for { 395 sg := c.recvq.dequeue() 396 if sg == nil { 397 break 398 } 399 if sg.elem != nil { 400 typedmemclr(c.elemtype, sg.elem) 401 sg.elem = nil 402 } 403 if sg.releasetime != 0 { 404 sg.releasetime = cputicks() 405 } 406 gp := sg.g 407 gp.param = unsafe.Pointer(sg) 408 sg.success = false 409 if raceenabled { 410 raceacquireg(gp, c.raceaddr()) 411 } 412 glist.push(gp) 413 } 414 415 // release all writers (they will panic) 416 for { 417 sg := c.sendq.dequeue() 418 if sg == nil { 419 break 420 } 421 sg.elem = nil 422 if sg.releasetime != 0 { 423 sg.releasetime = cputicks() 424 } 425 gp := sg.g 426 gp.param = unsafe.Pointer(sg) 427 sg.success = false 428 if raceenabled { 429 raceacquireg(gp, c.raceaddr()) 430 } 431 glist.push(gp) 432 } 433 unlock(&c.lock) 434 435 // Ready all Gs now that we've dropped the channel lock. 436 for !glist.empty() { 437 gp := glist.pop() 438 gp.schedlink = 0 439 goready(gp, 3) 440 } 441} 442 443// empty reports whether a read from c would block (that is, the channel is 444// empty). It uses a single atomic read of mutable state. 445func empty(c *hchan) bool { 446 // c.dataqsiz is immutable. 447 if c.dataqsiz == 0 { 448 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil 449 } 450 return atomic.Loaduint(&c.qcount) == 0 451} 452 453// entry points for <- c from compiled code 454//go:nosplit 455func chanrecv1(c *hchan, elem unsafe.Pointer) { 456 chanrecv(c, elem, true) 457} 458 459//go:nosplit 460func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { 461 _, received = chanrecv(c, elem, true) 462 return 463} 464 465// chanrecv receives on channel c and writes the received data to ep. 466// ep may be nil, in which case received data is ignored. 467// If block == false and no elements are available, returns (false, false). 468// Otherwise, if c is closed, zeros *ep and returns (true, false). 469// Otherwise, fills in *ep with an element and returns (true, true). 470// A non-nil ep must point to the heap or the caller's stack. 471func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { 472 // raceenabled: don't need to check ep, as it is always on the stack 473 // or is new memory allocated by reflect. 474 475 if debugChan { 476 print("chanrecv: chan=", c, "\n") 477 } 478 479 // Check preemption, since unlike gc we don't check on every call. 480 if getg().preempt { 481 checkPreempt() 482 } 483 484 if c == nil { 485 if !block { 486 return 487 } 488 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) 489 throw("unreachable") 490 } 491 492 // Fast path: check for failed non-blocking operation without acquiring the lock. 493 if !block && empty(c) { 494 // After observing that the channel is not ready for receiving, we observe whether the 495 // channel is closed. 496 // 497 // Reordering of these checks could lead to incorrect behavior when racing with a close. 498 // For example, if the channel was open and not empty, was closed, and then drained, 499 // reordered reads could incorrectly indicate "open and empty". To prevent reordering, 500 // we use atomic loads for both checks, and rely on emptying and closing to happen in 501 // separate critical sections under the same lock. This assumption fails when closing 502 // an unbuffered channel with a blocked send, but that is an error condition anyway. 503 if atomic.Load(&c.closed) == 0 { 504 // Because a channel cannot be reopened, the later observation of the channel 505 // being not closed implies that it was also not closed at the moment of the 506 // first observation. We behave as if we observed the channel at that moment 507 // and report that the receive cannot proceed. 508 return 509 } 510 // The channel is irreversibly closed. Re-check whether the channel has any pending data 511 // to receive, which could have arrived between the empty and closed checks above. 512 // Sequential consistency is also required here, when racing with such a send. 513 if empty(c) { 514 // The channel is irreversibly closed and empty. 515 if raceenabled { 516 raceacquire(c.raceaddr()) 517 } 518 if ep != nil { 519 typedmemclr(c.elemtype, ep) 520 } 521 return true, false 522 } 523 } 524 525 var t0 int64 526 if blockprofilerate > 0 { 527 t0 = cputicks() 528 } 529 530 lock(&c.lock) 531 532 if c.closed != 0 && c.qcount == 0 { 533 if raceenabled { 534 raceacquire(c.raceaddr()) 535 } 536 unlock(&c.lock) 537 if ep != nil { 538 typedmemclr(c.elemtype, ep) 539 } 540 return true, false 541 } 542 543 if sg := c.sendq.dequeue(); sg != nil { 544 // Found a waiting sender. If buffer is size 0, receive value 545 // directly from sender. Otherwise, receive from head of queue 546 // and add sender's value to the tail of the queue (both map to 547 // the same buffer slot because the queue is full). 548 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) 549 return true, true 550 } 551 552 if c.qcount > 0 { 553 // Receive directly from queue 554 qp := chanbuf(c, c.recvx) 555 if raceenabled { 556 racenotify(c, c.recvx, nil) 557 } 558 if ep != nil { 559 typedmemmove(c.elemtype, ep, qp) 560 } 561 typedmemclr(c.elemtype, qp) 562 c.recvx++ 563 if c.recvx == c.dataqsiz { 564 c.recvx = 0 565 } 566 c.qcount-- 567 unlock(&c.lock) 568 return true, true 569 } 570 571 if !block { 572 unlock(&c.lock) 573 return false, false 574 } 575 576 // no sender available: block on this channel. 577 gp := getg() 578 mysg := acquireSudog() 579 mysg.releasetime = 0 580 if t0 != 0 { 581 mysg.releasetime = -1 582 } 583 // No stack splits between assigning elem and enqueuing mysg 584 // on gp.waiting where copystack can find it. 585 mysg.elem = ep 586 mysg.waitlink = nil 587 gp.waiting = mysg 588 mysg.g = gp 589 mysg.isSelect = false 590 mysg.c = c 591 gp.param = nil 592 c.recvq.enqueue(mysg) 593 // Signal to anyone trying to shrink our stack that we're about 594 // to park on a channel. The window between when this G's status 595 // changes and when we set gp.activeStackChans is not safe for 596 // stack shrinking. 597 atomic.Store8(&gp.parkingOnChan, 1) 598 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) 599 600 // someone woke us up 601 if mysg != gp.waiting { 602 throw("G waiting list is corrupted") 603 } 604 gp.waiting = nil 605 gp.activeStackChans = false 606 if mysg.releasetime > 0 { 607 blockevent(mysg.releasetime-t0, 2) 608 } 609 success := mysg.success 610 gp.param = nil 611 mysg.c = nil 612 releaseSudog(mysg) 613 return true, success 614} 615 616// recv processes a receive operation on a full channel c. 617// There are 2 parts: 618// 1) The value sent by the sender sg is put into the channel 619// and the sender is woken up to go on its merry way. 620// 2) The value received by the receiver (the current G) is 621// written to ep. 622// For synchronous channels, both values are the same. 623// For asynchronous channels, the receiver gets its data from 624// the channel buffer and the sender's data is put in the 625// channel buffer. 626// Channel c must be full and locked. recv unlocks c with unlockf. 627// sg must already be dequeued from c. 628// A non-nil ep must point to the heap or the caller's stack. 629func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { 630 if c.dataqsiz == 0 { 631 if raceenabled { 632 racesync(c, sg) 633 } 634 if ep != nil { 635 // copy data from sender 636 recvDirect(c.elemtype, sg, ep) 637 } 638 } else { 639 // Queue is full. Take the item at the 640 // head of the queue. Make the sender enqueue 641 // its item at the tail of the queue. Since the 642 // queue is full, those are both the same slot. 643 qp := chanbuf(c, c.recvx) 644 if raceenabled { 645 racenotify(c, c.recvx, nil) 646 racenotify(c, c.recvx, sg) 647 } 648 // copy data from queue to receiver 649 if ep != nil { 650 typedmemmove(c.elemtype, ep, qp) 651 } 652 // copy data from sender to queue 653 typedmemmove(c.elemtype, qp, sg.elem) 654 c.recvx++ 655 if c.recvx == c.dataqsiz { 656 c.recvx = 0 657 } 658 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz 659 } 660 sg.elem = nil 661 gp := sg.g 662 unlockf() 663 gp.param = unsafe.Pointer(sg) 664 sg.success = true 665 if sg.releasetime != 0 { 666 sg.releasetime = cputicks() 667 } 668 goready(gp, skip+1) 669} 670 671func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool { 672 // There are unlocked sudogs that point into gp's stack. Stack 673 // copying must lock the channels of those sudogs. 674 // Set activeStackChans here instead of before we try parking 675 // because we could self-deadlock in stack growth on the 676 // channel lock. 677 gp.activeStackChans = true 678 // Mark that it's safe for stack shrinking to occur now, 679 // because any thread acquiring this G's stack for shrinking 680 // is guaranteed to observe activeStackChans after this store. 681 atomic.Store8(&gp.parkingOnChan, 0) 682 // Make sure we unlock after setting activeStackChans and 683 // unsetting parkingOnChan. The moment we unlock chanLock 684 // we risk gp getting readied by a channel operation and 685 // so gp could continue running before everything before 686 // the unlock is visible (even to gp itself). 687 unlock((*mutex)(chanLock)) 688 return true 689} 690 691// compiler implements 692// 693// select { 694// case c <- v: 695// ... foo 696// default: 697// ... bar 698// } 699// 700// as 701// 702// if selectnbsend(c, v) { 703// ... foo 704// } else { 705// ... bar 706// } 707// 708func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { 709 return chansend(c, elem, false, getcallerpc()) 710} 711 712// compiler implements 713// 714// select { 715// case v = <-c: 716// ... foo 717// default: 718// ... bar 719// } 720// 721// as 722// 723// if selectnbrecv(&v, c) { 724// ... foo 725// } else { 726// ... bar 727// } 728// 729func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { 730 selected, _ = chanrecv(c, elem, false) 731 return 732} 733 734// compiler implements 735// 736// select { 737// case v, ok = <-c: 738// ... foo 739// default: 740// ... bar 741// } 742// 743// as 744// 745// if c != nil && selectnbrecv2(&v, &ok, c) { 746// ... foo 747// } else { 748// ... bar 749// } 750// 751func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { 752 // TODO(khr): just return 2 values from this function, now that it is in Go. 753 selected, *received = chanrecv(c, elem, false) 754 return 755} 756 757//go:linkname reflect_chansend reflect.chansend 758func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) { 759 return chansend(c, elem, !nb, getcallerpc()) 760} 761 762//go:linkname reflect_chanrecv reflect.chanrecv 763func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) { 764 return chanrecv(c, elem, !nb) 765} 766 767//go:linkname reflect_chanlen reflect.chanlen 768func reflect_chanlen(c *hchan) int { 769 if c == nil { 770 return 0 771 } 772 return int(c.qcount) 773} 774 775//go:linkname reflectlite_chanlen internal_1reflectlite.chanlen 776func reflectlite_chanlen(c *hchan) int { 777 if c == nil { 778 return 0 779 } 780 return int(c.qcount) 781} 782 783//go:linkname reflect_chancap reflect.chancap 784func reflect_chancap(c *hchan) int { 785 if c == nil { 786 return 0 787 } 788 return int(c.dataqsiz) 789} 790 791//go:linkname reflect_chanclose reflect.chanclose 792func reflect_chanclose(c *hchan) { 793 closechan(c) 794} 795 796func (q *waitq) enqueue(sgp *sudog) { 797 sgp.next = nil 798 x := q.last 799 if x == nil { 800 sgp.prev = nil 801 q.first = sgp 802 q.last = sgp 803 return 804 } 805 sgp.prev = x 806 x.next = sgp 807 q.last = sgp 808} 809 810func (q *waitq) dequeue() *sudog { 811 for { 812 sgp := q.first 813 if sgp == nil { 814 return nil 815 } 816 y := sgp.next 817 if y == nil { 818 q.first = nil 819 q.last = nil 820 } else { 821 y.prev = nil 822 q.first = y 823 sgp.next = nil // mark as removed (see dequeueSudog) 824 } 825 826 // if a goroutine was put on this queue because of a 827 // select, there is a small window between the goroutine 828 // being woken up by a different case and it grabbing the 829 // channel locks. Once it has the lock 830 // it removes itself from the queue, so we won't see it after that. 831 // We use a flag in the G struct to tell us when someone 832 // else has won the race to signal this goroutine but the goroutine 833 // hasn't removed itself from the queue yet. 834 if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) { 835 continue 836 } 837 838 return sgp 839 } 840} 841 842func (c *hchan) raceaddr() unsafe.Pointer { 843 // Treat read-like and write-like operations on the channel to 844 // happen at this address. Avoid using the address of qcount 845 // or dataqsiz, because the len() and cap() builtins read 846 // those addresses, and we don't want them racing with 847 // operations like close(). 848 return unsafe.Pointer(&c.buf) 849} 850 851func racesync(c *hchan, sg *sudog) { 852 racerelease(chanbuf(c, 0)) 853 raceacquireg(sg.g, chanbuf(c, 0)) 854 racereleaseg(sg.g, chanbuf(c, 0)) 855 raceacquire(chanbuf(c, 0)) 856} 857 858// Notify the race detector of a send or receive involving buffer entry idx 859// and a channel c or its communicating partner sg. 860// This function handles the special case of c.elemsize==0. 861func racenotify(c *hchan, idx uint, sg *sudog) { 862 // We could have passed the unsafe.Pointer corresponding to entry idx 863 // instead of idx itself. However, in a future version of this function, 864 // we can use idx to better handle the case of elemsize==0. 865 // A future improvement to the detector is to call TSan with c and idx: 866 // this way, Go will continue to not allocating buffer entries for channels 867 // of elemsize==0, yet the race detector can be made to handle multiple 868 // sync objects underneath the hood (one sync object per idx) 869 qp := chanbuf(c, idx) 870 // When elemsize==0, we don't allocate a full buffer for the channel. 871 // Instead of individual buffer entries, the race detector uses the 872 // c.buf as the only buffer entry. This simplification prevents us from 873 // following the memory model's happens-before rules (rules that are 874 // implemented in racereleaseacquire). Instead, we accumulate happens-before 875 // information in the synchronization object associated with c.buf. 876 if c.elemsize == 0 { 877 if sg == nil { 878 raceacquire(qp) 879 racerelease(qp) 880 } else { 881 raceacquireg(sg.g, qp) 882 racereleaseg(sg.g, qp) 883 } 884 } else { 885 if sg == nil { 886 racereleaseacquire(qp) 887 } else { 888 racereleaseacquireg(sg.g, qp) 889 } 890 } 891} 892