1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package oracle 21 22import ( 23 "context" 24 "database/sql" 25 "fmt" 26 "net/url" 27 "strings" 28 "sync" 29 "time" 30 31 "zabbix.com/pkg/uri" 32 33 "github.com/godror/godror" 34 "github.com/omeid/go-yarn" 35 "zabbix.com/pkg/log" 36 "zabbix.com/pkg/zbxerr" 37) 38 39type OraClient interface { 40 Query(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error) 41 QueryByName(ctx context.Context, queryName string, args ...interface{}) (rows *sql.Rows, err error) 42 QueryRow(ctx context.Context, query string, args ...interface{}) (row *sql.Row, err error) 43 QueryRowByName(ctx context.Context, queryName string, args ...interface{}) (row *sql.Row, err error) 44 WhoAmI() string 45} 46 47type OraConn struct { 48 client *sql.DB 49 callTimeout time.Duration 50 version godror.VersionInfo 51 lastTimeAccess time.Time 52 ctx context.Context 53 queryStorage *yarn.Yarn 54 username string 55} 56 57var errorQueryNotFound = "query %q not found" 58 59// Query wraps DB.QueryContext. 60func (conn *OraConn) Query(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error) { 61 rows, err = conn.client.QueryContext(ctx, query, args...) 62 63 if ctxErr := ctx.Err(); ctxErr != nil { 64 err = ctxErr 65 } 66 67 return 68} 69 70// Query executes a query from queryStorage by its name and returns multiple rows. 71func (conn *OraConn) QueryByName(ctx context.Context, queryName string, 72 args ...interface{}) (rows *sql.Rows, err error) { 73 if sql, ok := (*conn.queryStorage).Get(queryName + sqlExt); ok { 74 normalizedSQL := strings.TrimRight(strings.TrimSpace(sql), ";") 75 76 return conn.Query(ctx, normalizedSQL, args...) 77 } 78 79 return nil, fmt.Errorf(errorQueryNotFound, queryName) 80} 81 82// Query wraps DB.QueryRowContext. 83func (conn *OraConn) QueryRow(ctx context.Context, query string, args ...interface{}) (row *sql.Row, err error) { 84 row = conn.client.QueryRowContext(ctx, query, args...) 85 86 if ctxErr := ctx.Err(); ctxErr != nil { 87 err = ctxErr 88 } 89 90 return 91} 92 93// Query executes a query from queryStorage by its name and returns a single row. 94func (conn *OraConn) QueryRowByName(ctx context.Context, queryName string, 95 args ...interface{}) (row *sql.Row, err error) { 96 if sql, ok := (*conn.queryStorage).Get(queryName + sqlExt); ok { 97 normalizedSQL := strings.TrimRight(strings.TrimSpace(sql), ";") 98 99 return conn.QueryRow(ctx, normalizedSQL, args...) 100 } 101 102 return nil, fmt.Errorf(errorQueryNotFound, queryName) 103} 104 105// WhoAmI returns a current username. 106func (conn *OraConn) WhoAmI() string { 107 return conn.username 108} 109 110// updateAccessTime updates the last time a connection was accessed. 111func (conn *OraConn) updateAccessTime() { 112 conn.lastTimeAccess = time.Now() 113} 114 115// ConnManager is thread-safe structure for manage connections. 116type ConnManager struct { 117 sync.Mutex 118 connMutex sync.Mutex 119 connections map[uri.URI]*OraConn 120 keepAlive time.Duration 121 connectTimeout time.Duration 122 callTimeout time.Duration 123 Destroy context.CancelFunc 124 queryStorage yarn.Yarn 125} 126 127// NewConnManager initializes connManager structure and runs Go Routine that watches for unused connections. 128func NewConnManager(keepAlive, connectTimeout, callTimeout, 129 hkInterval time.Duration, queryStorage yarn.Yarn) *ConnManager { 130 ctx, cancel := context.WithCancel(context.Background()) 131 132 connMgr := &ConnManager{ 133 connections: make(map[uri.URI]*OraConn), 134 keepAlive: keepAlive, 135 connectTimeout: connectTimeout, 136 callTimeout: callTimeout, 137 Destroy: cancel, // Destroy stops originated goroutines and closes connections. 138 queryStorage: queryStorage, 139 } 140 141 go connMgr.housekeeper(ctx, hkInterval) 142 143 return connMgr 144} 145 146// closeUnused closes each connection that has not been accessed at least within the keepalive interval. 147func (c *ConnManager) closeUnused() { 148 c.connMutex.Lock() 149 defer c.connMutex.Unlock() 150 151 for uri, conn := range c.connections { 152 if time.Since(conn.lastTimeAccess) > c.keepAlive { 153 conn.client.Close() 154 delete(c.connections, uri) 155 log.Debugf("[%s] Closed unused connection: %s", pluginName, uri.Addr()) 156 } 157 } 158} 159 160// closeAll closes all existed connections. 161func (c *ConnManager) closeAll() { 162 c.connMutex.Lock() 163 for uri, conn := range c.connections { 164 conn.client.Close() 165 delete(c.connections, uri) 166 } 167 c.connMutex.Unlock() 168} 169 170// housekeeper repeatedly checks for unused connections and closes them. 171func (c *ConnManager) housekeeper(ctx context.Context, interval time.Duration) { 172 ticker := time.NewTicker(interval) 173 174 for { 175 select { 176 case <-ctx.Done(): 177 ticker.Stop() 178 c.closeAll() 179 180 return 181 case <-ticker.C: 182 c.closeUnused() 183 } 184 } 185} 186 187// create creates a new connection with given credentials. 188func (c *ConnManager) create(uri uri.URI) (*OraConn, error) { 189 c.connMutex.Lock() 190 defer c.connMutex.Unlock() 191 192 if _, ok := c.connections[uri]; ok { 193 // Should never happen. 194 panic("connection already exists") 195 } 196 197 ctx := godror.ContextWithTraceTag( 198 context.Background(), 199 godror.TraceTag{ 200 ClientInfo: "zbx_monitor", 201 Module: godror.DriverName, 202 }) 203 204 service, err := url.QueryUnescape(uri.GetParam("service")) 205 if err != nil { 206 return nil, err 207 } 208 209 connectString := fmt.Sprintf(`(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))`+ 210 `(CONNECT_DATA=(SERVICE_NAME="%s"))(CONNECT_TIMEOUT=%d)(RETRY_COUNT=0))`, 211 uri.Host(), uri.Port(), service, c.connectTimeout/time.Second) 212 213 connector := godror.NewConnector(godror.ConnectionParams{ 214 StandaloneConnection: true, 215 CommonParams: godror.CommonParams{ 216 Username: uri.User(), 217 ConnectString: connectString, 218 Password: godror.NewPassword(uri.Password()), 219 }, 220 }) 221 222 client := sql.OpenDB(connector) 223 224 serverVersion, err := godror.ServerVersion(ctx, client) 225 if err != nil { 226 return nil, err 227 } 228 229 c.connections[uri] = &OraConn{ 230 client: client, 231 callTimeout: c.callTimeout, 232 version: serverVersion, 233 lastTimeAccess: time.Now(), 234 ctx: ctx, 235 queryStorage: &c.queryStorage, 236 username: uri.User(), 237 } 238 239 log.Debugf("[%s] Created new connection: %s", pluginName, uri.Addr()) 240 241 return c.connections[uri], nil 242} 243 244// get returns a connection with given uri if it exists and also updates lastTimeAccess, otherwise returns nil. 245func (c *ConnManager) get(uri uri.URI) *OraConn { 246 c.connMutex.Lock() 247 defer c.connMutex.Unlock() 248 249 if conn, ok := c.connections[uri]; ok { 250 conn.updateAccessTime() 251 return conn 252 } 253 254 return nil 255} 256 257// GetConnection returns an existing connection or creates a new one. 258func (c *ConnManager) GetConnection(uri uri.URI) (conn *OraConn, err error) { 259 c.Lock() 260 defer c.Unlock() 261 262 conn = c.get(uri) 263 264 if conn == nil { 265 conn, err = c.create(uri) 266 } 267 268 if err != nil { 269 if oraErr, isOraErr := godror.AsOraErr(err); isOraErr { 270 err = zbxerr.ErrorConnectionFailed.Wrap(oraErr) 271 } else { 272 err = zbxerr.ErrorConnectionFailed.Wrap(err) 273 } 274 } 275 276 return 277} 278