1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package memcached 21 22import ( 23 "context" 24 "sync" 25 "time" 26 27 "zabbix.com/pkg/uri" 28 "zabbix.com/pkg/zbxerr" 29 30 "github.com/memcachier/mc/v3" 31 "zabbix.com/pkg/log" 32) 33 34const poolSize = 1 35 36type MCClient interface { 37 Stats(key string) (mc.McStats, error) 38 NoOp() error 39} 40 41type MCConn struct { 42 client mc.Client 43 lastTimeAccess time.Time 44} 45 46// stubConn for testing 47type stubConn struct { 48 NoOpFunc func() error 49 StatsFunc func(key string) (mc.McStats, error) 50} 51 52func (c *stubConn) Stats(key string) (mc.McStats, error) { 53 return c.StatsFunc(key) 54} 55 56func (c *stubConn) NoOp() error { 57 return c.NoOpFunc() 58} 59 60// Stats wraps the mc.Client.StatsWithKey function. 61func (conn *MCConn) Stats(key string) (stats mc.McStats, err error) { 62 res, err := conn.client.StatsWithKey(key) 63 if err != nil { 64 return nil, err 65 } 66 67 if len(res) == 0 { 68 return nil, zbxerr.ErrorEmptyResult 69 } 70 71 if len(res) > 1 { 72 panic("unexpected result") 73 } 74 75 // get the only entry of stats 76 for _, stats = range res { 77 break 78 } 79 80 return stats, err 81} 82 83// NoOp wraps the mc.Client.NoOp function. 84func (conn *MCConn) NoOp() error { 85 return conn.client.NoOp() 86} 87 88// updateAccessTime updates the last time a connection was accessed. 89func (conn *MCConn) updateAccessTime() { 90 conn.lastTimeAccess = time.Now() 91} 92 93// ConnManager is thread-safe structure for manage connections. 94type ConnManager struct { 95 sync.Mutex 96 connMutex sync.Mutex 97 connections map[uri.URI]*MCConn 98 keepAlive time.Duration 99 timeout time.Duration 100 Destroy context.CancelFunc 101} 102 103// NewConnManager initializes connManager structure and runs Go Routine that watches for unused connections. 104func NewConnManager(keepAlive, timeout, hkInterval time.Duration) *ConnManager { 105 ctx, cancel := context.WithCancel(context.Background()) 106 107 connMgr := &ConnManager{ 108 connections: make(map[uri.URI]*MCConn), 109 keepAlive: keepAlive, 110 timeout: timeout, 111 Destroy: cancel, // Destroy stops originated goroutines and close connections. 112 } 113 114 go connMgr.housekeeper(ctx, hkInterval) 115 116 return connMgr 117} 118 119// closeUnused closes each connection that has not been accessed at least within the keepalive interval. 120func (c *ConnManager) closeUnused() { 121 c.connMutex.Lock() 122 defer c.connMutex.Unlock() 123 124 for uri, conn := range c.connections { 125 if time.Since(conn.lastTimeAccess) > c.keepAlive { 126 conn.client.Quit() 127 delete(c.connections, uri) 128 log.Debugf("[%s] Closed unused connection: %s", pluginName, uri.Addr()) 129 } 130 } 131} 132 133// closeAll closes all existed connections. 134func (c *ConnManager) closeAll() { 135 c.connMutex.Lock() 136 for uri, conn := range c.connections { 137 conn.client.Quit() 138 delete(c.connections, uri) 139 } 140 c.connMutex.Unlock() 141} 142 143// housekeeper repeatedly checks for unused connections and close them. 144func (c *ConnManager) housekeeper(ctx context.Context, interval time.Duration) { 145 ticker := time.NewTicker(interval) 146 147 for { 148 select { 149 case <-ctx.Done(): 150 ticker.Stop() 151 c.closeAll() 152 153 return 154 case <-ticker.C: 155 c.closeUnused() 156 } 157 } 158} 159 160// create creates a new connection with given credentials. 161func (c *ConnManager) create(uri uri.URI) *MCConn { 162 c.connMutex.Lock() 163 defer c.connMutex.Unlock() 164 165 if _, ok := c.connections[uri]; ok { 166 // Should never happen. 167 panic("connection already exists") 168 } 169 170 client := mc.NewMCwithConfig( 171 uri.String(), 172 uri.User(), 173 uri.Password(), 174 &mc.Config{ 175 Hasher: mc.NewModuloHasher(), 176 Retries: 2, 177 RetryDelay: 200 * time.Millisecond, 178 Failover: true, 179 ConnectionTimeout: c.timeout, 180 DownRetryDelay: 60 * time.Second, 181 PoolSize: poolSize, 182 TcpKeepAlive: true, 183 TcpKeepAlivePeriod: c.keepAlive, 184 TcpNoDelay: true, 185 }, 186 ) 187 188 c.connections[uri] = &MCConn{ 189 client: *client, 190 lastTimeAccess: time.Now(), 191 } 192 193 log.Debugf("[%s] Created new connection: %s", pluginName, uri.Addr()) 194 195 return c.connections[uri] 196} 197 198// get returns a connection with given uri if it exists and also updates lastTimeAccess, otherwise returns nil. 199func (c *ConnManager) get(uri uri.URI) *MCConn { 200 c.connMutex.Lock() 201 defer c.connMutex.Unlock() 202 203 if conn, ok := c.connections[uri]; ok { 204 conn.updateAccessTime() 205 return conn 206 } 207 208 return nil 209} 210 211// GetConnection returns an existing connection or creates a new one. 212func (c *ConnManager) GetConnection(uri uri.URI) (conn *MCConn) { 213 c.Lock() 214 defer c.Unlock() 215 216 conn = c.get(uri) 217 218 if conn == nil { 219 conn = c.create(uri) 220 } 221 222 return 223} 224