1// Copyright 2013 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// +build aix darwin dragonfly freebsd js,wasm linux netbsd openbsd solaris windows 6 7package runtime 8 9import ( 10 "runtime/internal/atomic" 11 "unsafe" 12) 13 14// Integrated network poller (platform-independent part). 15// A particular implementation (epoll/kqueue/port/AIX/Windows) 16// must define the following functions: 17// 18// func netpollinit() 19// Initialize the poller. Only called once. 20// 21// func netpollopen(fd uintptr, pd *pollDesc) int32 22// Arm edge-triggered notifications for fd. The pd argument is to pass 23// back to netpollready when fd is ready. Return an errno value. 24// 25// func netpoll(delta int64) gList 26// Poll the network. If delta < 0, block indefinitely. If delta == 0, 27// poll without blocking. If delta > 0, block for up to delta nanoseconds. 28// Return a list of goroutines built by calling netpollready. 29// 30// func netpollBreak() 31// Wake up the network poller, assumed to be blocked in netpoll. 32// 33// func netpollIsPollDescriptor(fd uintptr) bool 34// Reports whether fd is a file descriptor used by the poller. 35 36// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer 37// goroutines respectively. The semaphore can be in the following states: 38// pdReady - io readiness notification is pending; 39// a goroutine consumes the notification by changing the state to nil. 40// pdWait - a goroutine prepares to park on the semaphore, but not yet parked; 41// the goroutine commits to park by changing the state to G pointer, 42// or, alternatively, concurrent io notification changes the state to READY, 43// or, alternatively, concurrent timeout/close changes the state to nil. 44// G pointer - the goroutine is blocked on the semaphore; 45// io notification or timeout/close changes the state to READY or nil respectively 46// and unparks the goroutine. 47// nil - nothing of the above. 48const ( 49 pdReady uintptr = 1 50 pdWait uintptr = 2 51) 52 53const pollBlockSize = 4 * 1024 54 55// Network poller descriptor. 56// 57// No heap pointers. 58// 59//go:notinheap 60type pollDesc struct { 61 link *pollDesc // in pollcache, protected by pollcache.lock 62 63 // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. 64 // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. 65 // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) 66 // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated 67 // in a lock-free way by all operations. 68 // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), 69 // that will blow up when GC starts moving objects. 70 lock mutex // protects the following fields 71 fd uintptr 72 closing bool 73 everr bool // marks event scanning error happened 74 user uint32 // user settable cookie 75 rseq uintptr // protects from stale read timers 76 rg uintptr // pdReady, pdWait, G waiting for read or nil 77 rt timer // read deadline timer (set if rt.f != nil) 78 rd int64 // read deadline 79 wseq uintptr // protects from stale write timers 80 wg uintptr // pdReady, pdWait, G waiting for write or nil 81 wt timer // write deadline timer 82 wd int64 // write deadline 83} 84 85type pollCache struct { 86 lock mutex 87 first *pollDesc 88 // PollDesc objects must be type-stable, 89 // because we can get ready notification from epoll/kqueue 90 // after the descriptor is closed/reused. 91 // Stale notifications are detected using seq variable, 92 // seq is incremented when deadlines are changed or descriptor is reused. 93} 94 95var ( 96 netpollInitLock mutex 97 netpollInited uint32 98 99 pollcache pollCache 100 netpollWaiters uint32 101) 102 103//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit 104func poll_runtime_pollServerInit() { 105 netpollGenericInit() 106} 107 108func netpollGenericInit() { 109 if atomic.Load(&netpollInited) == 0 { 110 lock(&netpollInitLock) 111 if netpollInited == 0 { 112 netpollinit() 113 atomic.Store(&netpollInited, 1) 114 } 115 unlock(&netpollInitLock) 116 } 117} 118 119func netpollinited() bool { 120 return atomic.Load(&netpollInited) != 0 121} 122 123//go:linkname poll_runtime_isPollServerDescriptor internal/poll.runtime_isPollServerDescriptor 124 125// poll_runtime_isPollServerDescriptor reports whether fd is a 126// descriptor being used by netpoll. 127func poll_runtime_isPollServerDescriptor(fd uintptr) bool { 128 return netpollIsPollDescriptor(fd) 129} 130 131//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen 132func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { 133 pd := pollcache.alloc() 134 lock(&pd.lock) 135 if pd.wg != 0 && pd.wg != pdReady { 136 throw("runtime: blocked write on free polldesc") 137 } 138 if pd.rg != 0 && pd.rg != pdReady { 139 throw("runtime: blocked read on free polldesc") 140 } 141 pd.fd = fd 142 pd.closing = false 143 pd.everr = false 144 pd.rseq++ 145 pd.rg = 0 146 pd.rd = 0 147 pd.wseq++ 148 pd.wg = 0 149 pd.wd = 0 150 unlock(&pd.lock) 151 152 var errno int32 153 errno = netpollopen(fd, pd) 154 return pd, int(errno) 155} 156 157//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose 158func poll_runtime_pollClose(pd *pollDesc) { 159 if !pd.closing { 160 throw("runtime: close polldesc w/o unblock") 161 } 162 if pd.wg != 0 && pd.wg != pdReady { 163 throw("runtime: blocked write on closing polldesc") 164 } 165 if pd.rg != 0 && pd.rg != pdReady { 166 throw("runtime: blocked read on closing polldesc") 167 } 168 netpollclose(pd.fd) 169 pollcache.free(pd) 170} 171 172func (c *pollCache) free(pd *pollDesc) { 173 lock(&c.lock) 174 pd.link = c.first 175 c.first = pd 176 unlock(&c.lock) 177} 178 179//go:linkname poll_runtime_pollReset internal/poll.runtime_pollReset 180func poll_runtime_pollReset(pd *pollDesc, mode int) int { 181 err := netpollcheckerr(pd, int32(mode)) 182 if err != 0 { 183 return err 184 } 185 if mode == 'r' { 186 pd.rg = 0 187 } else if mode == 'w' { 188 pd.wg = 0 189 } 190 return 0 191} 192 193//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait 194func poll_runtime_pollWait(pd *pollDesc, mode int) int { 195 err := netpollcheckerr(pd, int32(mode)) 196 if err != 0 { 197 return err 198 } 199 // As for now only Solaris, illumos, and AIX use level-triggered IO. 200 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" { 201 netpollarm(pd, mode) 202 } 203 for !netpollblock(pd, int32(mode), false) { 204 err = netpollcheckerr(pd, int32(mode)) 205 if err != 0 { 206 return err 207 } 208 // Can happen if timeout has fired and unblocked us, 209 // but before we had a chance to run, timeout has been reset. 210 // Pretend it has not happened and retry. 211 } 212 return 0 213} 214 215//go:linkname poll_runtime_pollWaitCanceled internal/poll.runtime_pollWaitCanceled 216func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) { 217 // This function is used only on windows after a failed attempt to cancel 218 // a pending async IO operation. Wait for ioready, ignore closing or timeouts. 219 for !netpollblock(pd, int32(mode), true) { 220 } 221} 222 223//go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline 224func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { 225 lock(&pd.lock) 226 if pd.closing { 227 unlock(&pd.lock) 228 return 229 } 230 rd0, wd0 := pd.rd, pd.wd 231 combo0 := rd0 > 0 && rd0 == wd0 232 if d > 0 { 233 d += nanotime() 234 if d <= 0 { 235 // If the user has a deadline in the future, but the delay calculation 236 // overflows, then set the deadline to the maximum possible value. 237 d = 1<<63 - 1 238 } 239 } 240 if mode == 'r' || mode == 'r'+'w' { 241 pd.rd = d 242 } 243 if mode == 'w' || mode == 'r'+'w' { 244 pd.wd = d 245 } 246 combo := pd.rd > 0 && pd.rd == pd.wd 247 rtf := netpollReadDeadline 248 if combo { 249 rtf = netpollDeadline 250 } 251 if pd.rt.f == nil { 252 if pd.rd > 0 { 253 pd.rt.f = rtf 254 // Copy current seq into the timer arg. 255 // Timer func will check the seq against current descriptor seq, 256 // if they differ the descriptor was reused or timers were reset. 257 pd.rt.arg = pd 258 pd.rt.seq = pd.rseq 259 resettimer(&pd.rt, pd.rd) 260 } 261 } else if pd.rd != rd0 || combo != combo0 { 262 pd.rseq++ // invalidate current timers 263 if pd.rd > 0 { 264 modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq) 265 } else { 266 deltimer(&pd.rt) 267 pd.rt.f = nil 268 } 269 } 270 if pd.wt.f == nil { 271 if pd.wd > 0 && !combo { 272 pd.wt.f = netpollWriteDeadline 273 pd.wt.arg = pd 274 pd.wt.seq = pd.wseq 275 resettimer(&pd.wt, pd.wd) 276 } 277 } else if pd.wd != wd0 || combo != combo0 { 278 pd.wseq++ // invalidate current timers 279 if pd.wd > 0 && !combo { 280 modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd, pd.wseq) 281 } else { 282 deltimer(&pd.wt) 283 pd.wt.f = nil 284 } 285 } 286 // If we set the new deadline in the past, unblock currently pending IO if any. 287 var rg, wg *g 288 if pd.rd < 0 || pd.wd < 0 { 289 atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock 290 if pd.rd < 0 { 291 rg = netpollunblock(pd, 'r', false) 292 } 293 if pd.wd < 0 { 294 wg = netpollunblock(pd, 'w', false) 295 } 296 } 297 unlock(&pd.lock) 298 if rg != nil { 299 netpollgoready(rg, 3) 300 } 301 if wg != nil { 302 netpollgoready(wg, 3) 303 } 304} 305 306//go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock 307func poll_runtime_pollUnblock(pd *pollDesc) { 308 lock(&pd.lock) 309 if pd.closing { 310 throw("runtime: unblock on closing polldesc") 311 } 312 pd.closing = true 313 pd.rseq++ 314 pd.wseq++ 315 var rg, wg *g 316 atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock 317 rg = netpollunblock(pd, 'r', false) 318 wg = netpollunblock(pd, 'w', false) 319 if pd.rt.f != nil { 320 deltimer(&pd.rt) 321 pd.rt.f = nil 322 } 323 if pd.wt.f != nil { 324 deltimer(&pd.wt) 325 pd.wt.f = nil 326 } 327 unlock(&pd.lock) 328 if rg != nil { 329 netpollgoready(rg, 3) 330 } 331 if wg != nil { 332 netpollgoready(wg, 3) 333 } 334} 335 336// netpollready is called by the platform-specific netpoll function. 337// It declares that the fd associated with pd is ready for I/O. 338// The toRun argument is used to build a list of goroutines to return 339// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate 340// whether the fd is ready for reading or writing or both. 341// 342// This may run while the world is stopped, so write barriers are not allowed. 343//go:nowritebarrier 344func netpollready(toRun *gList, pd *pollDesc, mode int32) { 345 var rg, wg *g 346 if mode == 'r' || mode == 'r'+'w' { 347 rg = netpollunblock(pd, 'r', true) 348 } 349 if mode == 'w' || mode == 'r'+'w' { 350 wg = netpollunblock(pd, 'w', true) 351 } 352 if rg != nil { 353 toRun.push(rg) 354 } 355 if wg != nil { 356 toRun.push(wg) 357 } 358} 359 360func netpollcheckerr(pd *pollDesc, mode int32) int { 361 if pd.closing { 362 return 1 // ErrFileClosing or ErrNetClosing 363 } 364 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) { 365 return 2 // ErrTimeout 366 } 367 // Report an event scanning error only on a read event. 368 // An error on a write event will be captured in a subsequent 369 // write call that is able to report a more specific error. 370 if mode == 'r' && pd.everr { 371 return 3 // ErrNotPollable 372 } 373 return 0 374} 375 376func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { 377 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) 378 if r { 379 // Bump the count of goroutines waiting for the poller. 380 // The scheduler uses this to decide whether to block 381 // waiting for the poller if there is nothing else to do. 382 atomic.Xadd(&netpollWaiters, 1) 383 } 384 return r 385} 386 387func netpollgoready(gp *g, traceskip int) { 388 atomic.Xadd(&netpollWaiters, -1) 389 goready(gp, traceskip+1) 390} 391 392// returns true if IO is ready, or false if timedout or closed 393// waitio - wait only for completed IO, ignore errors 394func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { 395 gpp := &pd.rg 396 if mode == 'w' { 397 gpp = &pd.wg 398 } 399 400 // set the gpp semaphore to WAIT 401 for { 402 old := *gpp 403 if old == pdReady { 404 *gpp = 0 405 return true 406 } 407 if old != 0 { 408 throw("runtime: double wait") 409 } 410 if atomic.Casuintptr(gpp, 0, pdWait) { 411 break 412 } 413 } 414 415 // need to recheck error states after setting gpp to WAIT 416 // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl 417 // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg 418 if waitio || netpollcheckerr(pd, mode) == 0 { 419 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) 420 } 421 // be careful to not lose concurrent READY notification 422 old := atomic.Xchguintptr(gpp, 0) 423 if old > pdWait { 424 throw("runtime: corrupted polldesc") 425 } 426 return old == pdReady 427} 428 429func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { 430 gpp := &pd.rg 431 if mode == 'w' { 432 gpp = &pd.wg 433 } 434 435 for { 436 old := *gpp 437 if old == pdReady { 438 return nil 439 } 440 if old == 0 && !ioready { 441 // Only set READY for ioready. runtime_pollWait 442 // will check for timeout/cancel before waiting. 443 return nil 444 } 445 var new uintptr 446 if ioready { 447 new = pdReady 448 } 449 if atomic.Casuintptr(gpp, old, new) { 450 if old == pdReady || old == pdWait { 451 old = 0 452 } 453 return (*g)(unsafe.Pointer(old)) 454 } 455 } 456} 457 458func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { 459 lock(&pd.lock) 460 // Seq arg is seq when the timer was set. 461 // If it's stale, ignore the timer event. 462 currentSeq := pd.rseq 463 if !read { 464 currentSeq = pd.wseq 465 } 466 if seq != currentSeq { 467 // The descriptor was reused or timers were reset. 468 unlock(&pd.lock) 469 return 470 } 471 var rg *g 472 if read { 473 if pd.rd <= 0 || pd.rt.f == nil { 474 throw("runtime: inconsistent read deadline") 475 } 476 pd.rd = -1 477 atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock 478 rg = netpollunblock(pd, 'r', false) 479 } 480 var wg *g 481 if write { 482 if pd.wd <= 0 || pd.wt.f == nil && !read { 483 throw("runtime: inconsistent write deadline") 484 } 485 pd.wd = -1 486 atomic.StorepNoWB(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock 487 wg = netpollunblock(pd, 'w', false) 488 } 489 unlock(&pd.lock) 490 if rg != nil { 491 netpollgoready(rg, 0) 492 } 493 if wg != nil { 494 netpollgoready(wg, 0) 495 } 496} 497 498func netpollDeadline(arg interface{}, seq uintptr) { 499 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true) 500} 501 502func netpollReadDeadline(arg interface{}, seq uintptr) { 503 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false) 504} 505 506func netpollWriteDeadline(arg interface{}, seq uintptr) { 507 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true) 508} 509 510func (c *pollCache) alloc() *pollDesc { 511 lock(&c.lock) 512 if c.first == nil { 513 const pdSize = unsafe.Sizeof(pollDesc{}) 514 n := pollBlockSize / pdSize 515 if n == 0 { 516 n = 1 517 } 518 // Must be in non-GC memory because can be referenced 519 // only from epoll/kqueue internals. 520 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys) 521 for i := uintptr(0); i < n; i++ { 522 pd := (*pollDesc)(add(mem, i*pdSize)) 523 pd.link = c.first 524 c.first = pd 525 } 526 } 527 pd := c.first 528 c.first = pd.link 529 unlock(&c.lock) 530 return pd 531} 532