1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package topology 8 9import ( 10 "fmt" 11 "sync" 12 "sync/atomic" 13 "time" 14) 15 16// expiredFunc is the function type used for testing whether or not resources in a resourcePool have stale. It should 17// return true if the resource has stale and can be removed from the pool. 18type expiredFunc func(interface{}) bool 19 20// closeFunc is the function type used to closeConnection resources in a resourcePool. The pool will always call this function 21// asynchronously 22type closeFunc func(interface{}) 23 24// initFunc is the function used to add a resource to the resource pool to maintain minimum size. It returns a new 25// resource each time it is called. 26type initFunc func() interface{} 27 28type resourcePoolConfig struct { 29 MinSize uint64 30 MaintainInterval time.Duration 31 ExpiredFn expiredFunc 32 CloseFn closeFunc 33 InitFn initFunc 34} 35 36// setup sets defaults in the rpc and checks that the given values are valid 37func (rpc *resourcePoolConfig) setup() error { 38 if rpc.ExpiredFn == nil { 39 return fmt.Errorf("an ExpiredFn is required to create a resource pool") 40 } 41 if rpc.CloseFn == nil { 42 return fmt.Errorf("an CloseFn is required to create a resource pool") 43 } 44 if rpc.MaintainInterval == time.Duration(0) { 45 return fmt.Errorf("unable to have MaintainInterval time of %v", rpc.MaintainInterval) 46 } 47 return nil 48} 49 50// resourcePoolElement is a link list element 51type resourcePoolElement struct { 52 next, prev *resourcePoolElement 53 value interface{} 54} 55 56// resourcePool is a concurrent resource pool 57type resourcePool struct { 58 start, end *resourcePoolElement 59 size, minSize uint64 60 expiredFn expiredFunc 61 closeFn closeFunc 62 initFn initFunc 63 maintainTimer *time.Timer 64 maintainInterval time.Duration 65 66 sync.Mutex 67} 68 69// NewResourcePool creates a new resourcePool instance that is capped to maxSize resources. 70// If maxSize is 0, the pool size will be unbounded. 71func newResourcePool(config resourcePoolConfig) (*resourcePool, error) { 72 err := (&config).setup() 73 if err != nil { 74 return nil, err 75 } 76 rp := &resourcePool{ 77 minSize: config.MinSize, 78 expiredFn: config.ExpiredFn, 79 closeFn: config.CloseFn, 80 initFn: config.InitFn, 81 maintainInterval: config.MaintainInterval, 82 } 83 84 return rp, nil 85} 86 87func (rp *resourcePool) initialize() { 88 rp.Lock() 89 rp.maintainTimer = time.AfterFunc(rp.maintainInterval, rp.Maintain) 90 rp.Unlock() 91 92 rp.Maintain() 93} 94 95// add will add a new rpe to the pool, requires that the resource pool is locked 96func (rp *resourcePool) add(e *resourcePoolElement) { 97 if e == nil { 98 e = &resourcePoolElement{ 99 value: rp.initFn(), 100 } 101 } 102 103 e.next = rp.start 104 if rp.start != nil { 105 rp.start.prev = e 106 } 107 rp.start = e 108 if rp.end == nil { 109 rp.end = e 110 } 111 atomic.AddUint64(&rp.size, 1) 112} 113 114// Get returns the first un-stale resource from the pool. If no such resource can be found, nil is returned. 115func (rp *resourcePool) Get() interface{} { 116 rp.Lock() 117 defer rp.Unlock() 118 119 for rp.start != nil { 120 curr := rp.start 121 rp.remove(curr) 122 if !rp.expiredFn(curr.value) { 123 return curr.value 124 } 125 rp.closeFn(curr.value) 126 } 127 return nil 128} 129 130// Put puts the resource back into the pool if it will not exceed the max size of the pool 131func (rp *resourcePool) Put(v interface{}) bool { 132 if rp.expiredFn(v) { 133 rp.closeFn(v) 134 return false 135 } 136 137 rp.Lock() 138 defer rp.Unlock() 139 rp.add(&resourcePoolElement{value: v}) 140 return true 141} 142 143// remove removes a rpe from the linked list. Requires that the pool be locked 144func (rp *resourcePool) remove(e *resourcePoolElement) { 145 if e == nil { 146 return 147 } 148 149 if e.prev != nil { 150 e.prev.next = e.next 151 } 152 if e.next != nil { 153 e.next.prev = e.prev 154 } 155 if e == rp.start { 156 rp.start = e.next 157 } 158 if e == rp.end { 159 rp.end = e.prev 160 } 161 atomicSubtract1Uint64(&rp.size) 162} 163 164// Maintain puts the pool back into a state of having a correct number of resources if possible and removes all stale resources 165func (rp *resourcePool) Maintain() { 166 rp.Lock() 167 defer rp.Unlock() 168 for curr := rp.end; curr != nil; curr = curr.prev { 169 if rp.expiredFn(curr.value) { 170 rp.remove(curr) 171 rp.closeFn(curr.value) 172 } 173 } 174 175 for atomic.LoadUint64(&rp.size) < rp.minSize { 176 rp.add(nil) 177 } 178 179 // reset the timer for the background cleanup routine 180 if rp.maintainTimer == nil { 181 rp.maintainTimer = time.AfterFunc(rp.maintainInterval, rp.Maintain) 182 } 183 if !rp.maintainTimer.Stop() { 184 rp.maintainTimer = time.AfterFunc(rp.maintainInterval, rp.Maintain) 185 return 186 } 187 rp.maintainTimer.Reset(rp.maintainInterval) 188} 189 190// Close clears the pool and stops the background maintenance of the pool 191func (rp *resourcePool) Close() { 192 rp.Clear() 193 _ = rp.maintainTimer.Stop() 194} 195 196// Clear closes all resources in the pool 197func (rp *resourcePool) Clear() { 198 rp.Lock() 199 defer rp.Unlock() 200 for ; rp.start != nil; rp.start = rp.start.next { 201 rp.closeFn(rp.start.value) 202 } 203 atomic.StoreUint64(&rp.size, 0) 204 rp.end = nil 205} 206 207func atomicSubtract1Uint64(p *uint64) { 208 if p == nil || atomic.LoadUint64(p) == 0 { 209 return 210 } 211 212 for { 213 expected := atomic.LoadUint64(p) 214 if atomic.CompareAndSwapUint64(p, expected, expected-1) { 215 return 216 } 217 } 218} 219