1// Thread safe engine for MyMySQL 2// 3// In contrast to native engine: 4// - one connection can be used by multiple gorutines, 5// - if connection is idle pings are sent to the server (once per minute) to 6// avoid timeout. 7// 8// See documentation of mymysql/native for details 9package thrsafe 10 11import ( 12 "github.com/ziutek/mymysql/mysql" 13 _ "github.com/ziutek/mymysql/native" 14 "io" 15 "sync" 16 "time" 17) 18 19type Conn struct { 20 mysql.Conn 21 mutex *sync.Mutex 22 23 stopPinger chan struct{} 24 lastUsed time.Time 25} 26 27func (c *Conn) lock() { 28 //log.Println(c, ":: lock @", c.mutex) 29 c.mutex.Lock() 30} 31 32func (c *Conn) unlock() { 33 //log.Println(c, ":: unlock @", c.mutex) 34 c.lastUsed = time.Now() 35 c.mutex.Unlock() 36} 37 38type Result struct { 39 mysql.Result 40 conn *Conn 41} 42 43type Stmt struct { 44 mysql.Stmt 45 conn *Conn 46} 47 48type Transaction struct { 49 *Conn 50 conn *Conn 51} 52 53func New(proto, laddr, raddr, user, passwd string, db ...string) mysql.Conn { 54 return &Conn{ 55 Conn: orgNew(proto, laddr, raddr, user, passwd, db...), 56 mutex: new(sync.Mutex), 57 } 58} 59 60func (c *Conn) Clone() mysql.Conn { 61 return &Conn{ 62 Conn: c.Conn.Clone(), 63 mutex: new(sync.Mutex), 64 } 65} 66 67func (c *Conn) pinger() { 68 const to = 60 * time.Second 69 sleep := to 70 for { 71 timer := time.After(sleep) 72 select { 73 case <-c.stopPinger: 74 return 75 case t := <-timer: 76 sleep := to - t.Sub(c.lastUsed) 77 if sleep <= 0 { 78 if c.Ping() != nil { 79 return 80 } 81 sleep = to 82 } 83 } 84 } 85} 86 87func (c *Conn) Connect() error { 88 //log.Println("Connect") 89 c.lock() 90 defer c.unlock() 91 c.stopPinger = make(chan struct{}) 92 go c.pinger() 93 return c.Conn.Connect() 94} 95 96func (c *Conn) Close() error { 97 //log.Println("Close") 98 close(c.stopPinger) // Stop pinger before lock connection 99 c.lock() 100 defer c.unlock() 101 return c.Conn.Close() 102} 103 104func (c *Conn) Reconnect() error { 105 //log.Println("Reconnect") 106 c.lock() 107 defer c.unlock() 108 if c.stopPinger == nil { 109 go c.pinger() 110 } 111 return c.Conn.Reconnect() 112} 113 114func (c *Conn) Use(dbname string) error { 115 //log.Println("Use") 116 c.lock() 117 defer c.unlock() 118 return c.Conn.Use(dbname) 119} 120 121func (c *Conn) Start(sql string, params ...interface{}) (mysql.Result, error) { 122 //log.Println("Start") 123 c.lock() 124 res, err := c.Conn.Start(sql, params...) 125 // Unlock if error or OK result (which doesn't provide any fields) 126 if err != nil { 127 c.unlock() 128 return nil, err 129 } 130 if res.StatusOnly() && !res.MoreResults() { 131 c.unlock() 132 } 133 return &Result{Result: res, conn: c}, err 134} 135 136func (res *Result) ScanRow(row mysql.Row) error { 137 //log.Println("ScanRow") 138 err := res.Result.ScanRow(row) 139 if err == nil { 140 // There are more rows to read 141 return nil 142 } 143 if err == mysql.ErrReadAfterEOR { 144 // Trying read after EOR - connection unlocked before 145 return err 146 } 147 if err != io.EOF || !res.StatusOnly() && !res.MoreResults() { 148 // Error or no more rows in not empty result set and no more resutls. 149 // In case if empty result set and no more resutls Start has unlocked 150 // it before. 151 res.conn.unlock() 152 } 153 return err 154} 155 156func (res *Result) GetRow() (mysql.Row, error) { 157 return mysql.GetRow(res) 158} 159 160func (res *Result) NextResult() (mysql.Result, error) { 161 //log.Println("NextResult") 162 next, err := res.Result.NextResult() 163 if err != nil { 164 return nil, err 165 } 166 if next == nil { 167 return nil, nil 168 } 169 if next.StatusOnly() && !next.MoreResults() { 170 res.conn.unlock() 171 } 172 return &Result{next, res.conn}, nil 173} 174 175func (c *Conn) Ping() error { 176 c.lock() 177 defer c.unlock() 178 return c.Conn.Ping() 179} 180 181func (c *Conn) Prepare(sql string) (mysql.Stmt, error) { 182 //log.Println("Prepare") 183 c.lock() 184 defer c.unlock() 185 stmt, err := c.Conn.Prepare(sql) 186 if err != nil { 187 return nil, err 188 } 189 return &Stmt{Stmt: stmt, conn: c}, nil 190} 191 192func (stmt *Stmt) Run(params ...interface{}) (mysql.Result, error) { 193 //log.Println("Run") 194 stmt.conn.lock() 195 res, err := stmt.Stmt.Run(params...) 196 // Unlock if error or OK result (which doesn't provide any fields) 197 if err != nil { 198 stmt.conn.unlock() 199 return nil, err 200 } 201 if res.StatusOnly() && !res.MoreResults() { 202 stmt.conn.unlock() 203 } 204 return &Result{Result: res, conn: stmt.conn}, nil 205} 206 207func (stmt *Stmt) Delete() error { 208 //log.Println("Delete") 209 stmt.conn.lock() 210 defer stmt.conn.unlock() 211 return stmt.Stmt.Delete() 212} 213 214func (stmt *Stmt) Reset() error { 215 //log.Println("Reset") 216 stmt.conn.lock() 217 defer stmt.conn.unlock() 218 return stmt.Stmt.Reset() 219} 220 221func (stmt *Stmt) SendLongData(pnum int, data interface{}, pkt_size int) error { 222 //log.Println("SendLongData") 223 stmt.conn.lock() 224 defer stmt.conn.unlock() 225 return stmt.Stmt.SendLongData(pnum, data, pkt_size) 226} 227 228// See mysql.Query 229func (c *Conn) Query(sql string, params ...interface{}) ([]mysql.Row, mysql.Result, error) { 230 return mysql.Query(c, sql, params...) 231} 232 233// See mysql.QueryFirst 234func (my *Conn) QueryFirst(sql string, params ...interface{}) (mysql.Row, mysql.Result, error) { 235 return mysql.QueryFirst(my, sql, params...) 236} 237 238// See mysql.QueryLast 239func (my *Conn) QueryLast(sql string, params ...interface{}) (mysql.Row, mysql.Result, error) { 240 return mysql.QueryLast(my, sql, params...) 241} 242 243// See mysql.Exec 244func (stmt *Stmt) Exec(params ...interface{}) ([]mysql.Row, mysql.Result, error) { 245 return mysql.Exec(stmt, params...) 246} 247 248// See mysql.ExecFirst 249func (stmt *Stmt) ExecFirst(params ...interface{}) (mysql.Row, mysql.Result, error) { 250 return mysql.ExecFirst(stmt, params...) 251} 252 253// See mysql.ExecLast 254func (stmt *Stmt) ExecLast(params ...interface{}) (mysql.Row, mysql.Result, error) { 255 return mysql.ExecLast(stmt, params...) 256} 257 258// See mysql.End 259func (res *Result) End() error { 260 return mysql.End(res) 261} 262 263// See mysql.GetFirstRow 264func (res *Result) GetFirstRow() (mysql.Row, error) { 265 return mysql.GetFirstRow(res) 266} 267 268// See mysql.GetLastRow 269func (res *Result) GetLastRow() (mysql.Row, error) { 270 return mysql.GetLastRow(res) 271} 272 273// See mysql.GetRows 274func (res *Result) GetRows() ([]mysql.Row, error) { 275 return mysql.GetRows(res) 276} 277 278// Begins a new transaction. No any other thread can send command on this 279// connection until Commit or Rollback will be called. 280// Periodical pinging the server is disabled during transaction. 281 282func (c *Conn) Begin() (mysql.Transaction, error) { 283 //log.Println("Begin") 284 c.lock() 285 tr := Transaction{ 286 &Conn{Conn: c.Conn, mutex: new(sync.Mutex)}, 287 c, 288 } 289 _, err := c.Conn.Start("START TRANSACTION") 290 if err != nil { 291 c.unlock() 292 return nil, err 293 } 294 return &tr, nil 295} 296 297func (tr *Transaction) end(cr string) error { 298 tr.lock() 299 _, err := tr.conn.Conn.Start(cr) 300 tr.conn.unlock() 301 // Invalidate this transaction 302 m := tr.Conn.mutex 303 tr.Conn = nil 304 tr.conn = nil 305 m.Unlock() // One goorutine which still uses this transaction will panic 306 return err 307} 308 309func (tr *Transaction) Commit() error { 310 //log.Println("Commit") 311 return tr.end("COMMIT") 312} 313 314func (tr *Transaction) Rollback() error { 315 //log.Println("Rollback") 316 return tr.end("ROLLBACK") 317} 318 319func (tr *Transaction) IsValid() bool { 320 return tr.Conn != nil 321} 322 323func (tr *Transaction) Do(st mysql.Stmt) mysql.Stmt { 324 if s, ok := st.(*Stmt); ok && s.conn == tr.conn { 325 // Returns new statement which uses statement mutexes 326 return &Stmt{s.Stmt, tr.Conn} 327 } 328 panic("Transaction and statement doesn't belong to the same connection") 329} 330 331var orgNew func(proto, laddr, raddr, user, passwd string, db ...string) mysql.Conn 332 333func init() { 334 orgNew = mysql.New 335 mysql.New = New 336} 337