1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package memcached
21
22import (
23	"context"
24	"sync"
25	"time"
26
27	"zabbix.com/pkg/uri"
28	"zabbix.com/pkg/zbxerr"
29
30	"github.com/memcachier/mc/v3"
31	"zabbix.com/pkg/log"
32)
33
34const poolSize = 1
35
36type MCClient interface {
37	Stats(key string) (mc.McStats, error)
38	NoOp() error
39}
40
41type MCConn struct {
42	client         mc.Client
43	lastTimeAccess time.Time
44}
45
46// stubConn for testing
47type stubConn struct {
48	NoOpFunc  func() error
49	StatsFunc func(key string) (mc.McStats, error)
50}
51
52func (c *stubConn) Stats(key string) (mc.McStats, error) {
53	return c.StatsFunc(key)
54}
55
56func (c *stubConn) NoOp() error {
57	return c.NoOpFunc()
58}
59
60// Stats wraps the mc.Client.StatsWithKey function.
61func (conn *MCConn) Stats(key string) (stats mc.McStats, err error) {
62	res, err := conn.client.StatsWithKey(key)
63	if err != nil {
64		return nil, err
65	}
66
67	if len(res) == 0 {
68		return nil, zbxerr.ErrorEmptyResult
69	}
70
71	if len(res) > 1 {
72		panic("unexpected result")
73	}
74
75	// get the only entry of stats
76	for _, stats = range res {
77		break
78	}
79
80	return stats, err
81}
82
83// NoOp wraps the mc.Client.NoOp function.
84func (conn *MCConn) NoOp() error {
85	return conn.client.NoOp()
86}
87
88// updateAccessTime updates the last time a connection was accessed.
89func (conn *MCConn) updateAccessTime() {
90	conn.lastTimeAccess = time.Now()
91}
92
93// ConnManager is thread-safe structure for manage connections.
94type ConnManager struct {
95	sync.Mutex
96	connMutex   sync.Mutex
97	connections map[uri.URI]*MCConn
98	keepAlive   time.Duration
99	timeout     time.Duration
100	Destroy     context.CancelFunc
101}
102
103// NewConnManager initializes connManager structure and runs Go Routine that watches for unused connections.
104func NewConnManager(keepAlive, timeout, hkInterval time.Duration) *ConnManager {
105	ctx, cancel := context.WithCancel(context.Background())
106
107	connMgr := &ConnManager{
108		connections: make(map[uri.URI]*MCConn),
109		keepAlive:   keepAlive,
110		timeout:     timeout,
111		Destroy:     cancel, // Destroy stops originated goroutines and close connections.
112	}
113
114	go connMgr.housekeeper(ctx, hkInterval)
115
116	return connMgr
117}
118
119// closeUnused closes each connection that has not been accessed at least within the keepalive interval.
120func (c *ConnManager) closeUnused() {
121	c.connMutex.Lock()
122	defer c.connMutex.Unlock()
123
124	for uri, conn := range c.connections {
125		if time.Since(conn.lastTimeAccess) > c.keepAlive {
126			conn.client.Quit()
127			delete(c.connections, uri)
128			log.Debugf("[%s] Closed unused connection: %s", pluginName, uri.Addr())
129		}
130	}
131}
132
133// closeAll closes all existed connections.
134func (c *ConnManager) closeAll() {
135	c.connMutex.Lock()
136	for uri, conn := range c.connections {
137		conn.client.Quit()
138		delete(c.connections, uri)
139	}
140	c.connMutex.Unlock()
141}
142
143// housekeeper repeatedly checks for unused connections and close them.
144func (c *ConnManager) housekeeper(ctx context.Context, interval time.Duration) {
145	ticker := time.NewTicker(interval)
146
147	for {
148		select {
149		case <-ctx.Done():
150			ticker.Stop()
151			c.closeAll()
152
153			return
154		case <-ticker.C:
155			c.closeUnused()
156		}
157	}
158}
159
160// create creates a new connection with given credentials.
161func (c *ConnManager) create(uri uri.URI) *MCConn {
162	c.connMutex.Lock()
163	defer c.connMutex.Unlock()
164
165	if _, ok := c.connections[uri]; ok {
166		// Should never happen.
167		panic("connection already exists")
168	}
169
170	client := mc.NewMCwithConfig(
171		uri.String(),
172		uri.User(),
173		uri.Password(),
174		&mc.Config{
175			Hasher:             mc.NewModuloHasher(),
176			Retries:            2,
177			RetryDelay:         200 * time.Millisecond,
178			Failover:           true,
179			ConnectionTimeout:  c.timeout,
180			DownRetryDelay:     60 * time.Second,
181			PoolSize:           poolSize,
182			TcpKeepAlive:       true,
183			TcpKeepAlivePeriod: c.keepAlive,
184			TcpNoDelay:         true,
185		},
186	)
187
188	c.connections[uri] = &MCConn{
189		client:         *client,
190		lastTimeAccess: time.Now(),
191	}
192
193	log.Debugf("[%s] Created new connection: %s", pluginName, uri.Addr())
194
195	return c.connections[uri]
196}
197
198// get returns a connection with given uri if it exists and also updates lastTimeAccess, otherwise returns nil.
199func (c *ConnManager) get(uri uri.URI) *MCConn {
200	c.connMutex.Lock()
201	defer c.connMutex.Unlock()
202
203	if conn, ok := c.connections[uri]; ok {
204		conn.updateAccessTime()
205		return conn
206	}
207
208	return nil
209}
210
211// GetConnection returns an existing connection or creates a new one.
212func (c *ConnManager) GetConnection(uri uri.URI) (conn *MCConn) {
213	c.Lock()
214	defer c.Unlock()
215
216	conn = c.get(uri)
217
218	if conn == nil {
219		conn = c.create(uri)
220	}
221
222	return
223}
224