1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package topology
8
9import (
10	"fmt"
11	"sync"
12	"sync/atomic"
13	"time"
14)
15
16// expiredFunc is the function type used for testing whether or not resources in a resourcePool have stale. It should
17// return true if the resource has stale and can be removed from the pool.
18type expiredFunc func(interface{}) bool
19
20// closeFunc is the function type used to closeConnection resources in a resourcePool. The pool will always call this function
21// asynchronously
22type closeFunc func(interface{})
23
24// initFunc is the function used to add a resource to the resource pool to maintain minimum size. It returns a new
25// resource each time it is called.
26type initFunc func() interface{}
27
28type resourcePoolConfig struct {
29	MinSize          uint64
30	MaintainInterval time.Duration
31	ExpiredFn        expiredFunc
32	CloseFn          closeFunc
33	InitFn           initFunc
34}
35
36// setup sets defaults in the rpc and checks that the given values are valid
37func (rpc *resourcePoolConfig) setup() error {
38	if rpc.ExpiredFn == nil {
39		return fmt.Errorf("an ExpiredFn is required to create a resource pool")
40	}
41	if rpc.CloseFn == nil {
42		return fmt.Errorf("an CloseFn is required to create a resource pool")
43	}
44	if rpc.MaintainInterval == time.Duration(0) {
45		return fmt.Errorf("unable to have MaintainInterval time of %v", rpc.MaintainInterval)
46	}
47	return nil
48}
49
50// resourcePoolElement is a link list element
51type resourcePoolElement struct {
52	next, prev *resourcePoolElement
53	value      interface{}
54}
55
56// resourcePool is a concurrent resource pool
57type resourcePool struct {
58	start, end       *resourcePoolElement
59	size, minSize    uint64
60	expiredFn        expiredFunc
61	closeFn          closeFunc
62	initFn           initFunc
63	maintainTimer    *time.Timer
64	maintainInterval time.Duration
65
66	sync.Mutex
67}
68
69// NewResourcePool creates a new resourcePool instance that is capped to maxSize resources.
70// If maxSize is 0, the pool size will be unbounded.
71func newResourcePool(config resourcePoolConfig) (*resourcePool, error) {
72	err := (&config).setup()
73	if err != nil {
74		return nil, err
75	}
76	rp := &resourcePool{
77		minSize:          config.MinSize,
78		expiredFn:        config.ExpiredFn,
79		closeFn:          config.CloseFn,
80		initFn:           config.InitFn,
81		maintainInterval: config.MaintainInterval,
82	}
83
84	return rp, nil
85}
86
87func (rp *resourcePool) initialize() {
88	rp.Lock()
89	rp.maintainTimer = time.AfterFunc(rp.maintainInterval, rp.Maintain)
90	rp.Unlock()
91
92	rp.Maintain()
93}
94
95// add will add a new rpe to the pool, requires that the resource pool is locked
96func (rp *resourcePool) add(e *resourcePoolElement) {
97	if e == nil {
98		e = &resourcePoolElement{
99			value: rp.initFn(),
100		}
101	}
102
103	e.next = rp.start
104	if rp.start != nil {
105		rp.start.prev = e
106	}
107	rp.start = e
108	if rp.end == nil {
109		rp.end = e
110	}
111	atomic.AddUint64(&rp.size, 1)
112}
113
114// Get returns the first un-stale resource from the pool. If no such resource can be found, nil is returned.
115func (rp *resourcePool) Get() interface{} {
116	rp.Lock()
117	defer rp.Unlock()
118
119	for rp.start != nil {
120		curr := rp.start
121		rp.remove(curr)
122		if !rp.expiredFn(curr.value) {
123			return curr.value
124		}
125		rp.closeFn(curr.value)
126	}
127	return nil
128}
129
130// Put puts the resource back into the pool if it will not exceed the max size of the pool
131func (rp *resourcePool) Put(v interface{}) bool {
132	if rp.expiredFn(v) {
133		rp.closeFn(v)
134		return false
135	}
136
137	rp.Lock()
138	defer rp.Unlock()
139	rp.add(&resourcePoolElement{value: v})
140	return true
141}
142
143// remove removes a rpe from the linked list. Requires that the pool be locked
144func (rp *resourcePool) remove(e *resourcePoolElement) {
145	if e == nil {
146		return
147	}
148
149	if e.prev != nil {
150		e.prev.next = e.next
151	}
152	if e.next != nil {
153		e.next.prev = e.prev
154	}
155	if e == rp.start {
156		rp.start = e.next
157	}
158	if e == rp.end {
159		rp.end = e.prev
160	}
161	atomicSubtract1Uint64(&rp.size)
162}
163
164// Maintain puts the pool back into a state of having a correct number of resources if possible and removes all stale resources
165func (rp *resourcePool) Maintain() {
166	rp.Lock()
167	defer rp.Unlock()
168	for curr := rp.end; curr != nil; curr = curr.prev {
169		if rp.expiredFn(curr.value) {
170			rp.remove(curr)
171			rp.closeFn(curr.value)
172		}
173	}
174
175	for atomic.LoadUint64(&rp.size) < rp.minSize {
176		rp.add(nil)
177	}
178
179	// reset the timer for the background cleanup routine
180	if rp.maintainTimer == nil {
181		rp.maintainTimer = time.AfterFunc(rp.maintainInterval, rp.Maintain)
182	}
183	if !rp.maintainTimer.Stop() {
184		rp.maintainTimer = time.AfterFunc(rp.maintainInterval, rp.Maintain)
185		return
186	}
187	rp.maintainTimer.Reset(rp.maintainInterval)
188}
189
190// Close clears the pool and stops the background maintenance of the pool
191func (rp *resourcePool) Close() {
192	rp.Clear()
193	_ = rp.maintainTimer.Stop()
194}
195
196// Clear closes all resources in the pool
197func (rp *resourcePool) Clear() {
198	rp.Lock()
199	defer rp.Unlock()
200	for ; rp.start != nil; rp.start = rp.start.next {
201		rp.closeFn(rp.start.value)
202	}
203	atomic.StoreUint64(&rp.size, 0)
204	rp.end = nil
205}
206
207func atomicSubtract1Uint64(p *uint64) {
208	if p == nil || atomic.LoadUint64(p) == 0 {
209		return
210	}
211
212	for {
213		expected := atomic.LoadUint64(p)
214		if atomic.CompareAndSwapUint64(p, expected, expected-1) {
215			return
216		}
217	}
218}
219