1// Thread safe engine for MyMySQL
2//
3// In contrast to native engine:
4// - one connection can be used by multiple gorutines,
5// - if connection is idle pings are sent to the server (once per minute) to
6//   avoid timeout.
7//
8// See documentation of mymysql/native for details
9package thrsafe
10
11import (
12	"github.com/ziutek/mymysql/mysql"
13	_ "github.com/ziutek/mymysql/native"
14	"io"
15	"sync"
16	"time"
17)
18
19type Conn struct {
20	mysql.Conn
21	mutex *sync.Mutex
22
23	stopPinger chan struct{}
24	lastUsed   time.Time
25}
26
27func (c *Conn) lock() {
28	//log.Println(c, ":: lock @", c.mutex)
29	c.mutex.Lock()
30}
31
32func (c *Conn) unlock() {
33	//log.Println(c, ":: unlock @", c.mutex)
34	c.lastUsed = time.Now()
35	c.mutex.Unlock()
36}
37
38type Result struct {
39	mysql.Result
40	conn *Conn
41}
42
43type Stmt struct {
44	mysql.Stmt
45	conn *Conn
46}
47
48type Transaction struct {
49	*Conn
50	conn *Conn
51}
52
53func New(proto, laddr, raddr, user, passwd string, db ...string) mysql.Conn {
54	return &Conn{
55		Conn:  orgNew(proto, laddr, raddr, user, passwd, db...),
56		mutex: new(sync.Mutex),
57	}
58}
59
60func (c *Conn) Clone() mysql.Conn {
61	return &Conn{
62		Conn:  c.Conn.Clone(),
63		mutex: new(sync.Mutex),
64	}
65}
66
67func (c *Conn) pinger() {
68	const to = 60 * time.Second
69	sleep := to
70	for {
71		timer := time.After(sleep)
72		select {
73		case <-c.stopPinger:
74			return
75		case t := <-timer:
76			sleep := to - t.Sub(c.lastUsed)
77			if sleep <= 0 {
78				if c.Ping() != nil {
79					return
80				}
81				sleep = to
82			}
83		}
84	}
85}
86
87func (c *Conn) Connect() error {
88	//log.Println("Connect")
89	c.lock()
90	defer c.unlock()
91	c.stopPinger = make(chan struct{})
92	go c.pinger()
93	return c.Conn.Connect()
94}
95
96func (c *Conn) Close() error {
97	//log.Println("Close")
98	close(c.stopPinger) // Stop pinger before lock connection
99	c.lock()
100	defer c.unlock()
101	return c.Conn.Close()
102}
103
104func (c *Conn) Reconnect() error {
105	//log.Println("Reconnect")
106	c.lock()
107	defer c.unlock()
108	if c.stopPinger == nil {
109		go c.pinger()
110	}
111	return c.Conn.Reconnect()
112}
113
114func (c *Conn) Use(dbname string) error {
115	//log.Println("Use")
116	c.lock()
117	defer c.unlock()
118	return c.Conn.Use(dbname)
119}
120
121func (c *Conn) Start(sql string, params ...interface{}) (mysql.Result, error) {
122	//log.Println("Start")
123	c.lock()
124	res, err := c.Conn.Start(sql, params...)
125	// Unlock if error or OK result (which doesn't provide any fields)
126	if err != nil {
127		c.unlock()
128		return nil, err
129	}
130	if res.StatusOnly() && !res.MoreResults() {
131		c.unlock()
132	}
133	return &Result{Result: res, conn: c}, err
134}
135
136func (res *Result) ScanRow(row mysql.Row) error {
137	//log.Println("ScanRow")
138	err := res.Result.ScanRow(row)
139	if err == nil {
140		// There are more rows to read
141		return nil
142	}
143	if err == mysql.ErrReadAfterEOR {
144		// Trying read after EOR - connection unlocked before
145		return err
146	}
147	if err != io.EOF || !res.StatusOnly() && !res.MoreResults() {
148		// Error or no more rows in not empty result set and no more resutls.
149		// In case if empty result set and no more resutls Start has unlocked
150		// it before.
151		res.conn.unlock()
152	}
153	return err
154}
155
156func (res *Result) GetRow() (mysql.Row, error) {
157	return mysql.GetRow(res)
158}
159
160func (res *Result) NextResult() (mysql.Result, error) {
161	//log.Println("NextResult")
162	next, err := res.Result.NextResult()
163	if err != nil {
164		return nil, err
165	}
166	if next == nil {
167		return nil, nil
168	}
169	if next.StatusOnly() && !next.MoreResults() {
170		res.conn.unlock()
171	}
172	return &Result{next, res.conn}, nil
173}
174
175func (c *Conn) Ping() error {
176	c.lock()
177	defer c.unlock()
178	return c.Conn.Ping()
179}
180
181func (c *Conn) Prepare(sql string) (mysql.Stmt, error) {
182	//log.Println("Prepare")
183	c.lock()
184	defer c.unlock()
185	stmt, err := c.Conn.Prepare(sql)
186	if err != nil {
187		return nil, err
188	}
189	return &Stmt{Stmt: stmt, conn: c}, nil
190}
191
192func (stmt *Stmt) Run(params ...interface{}) (mysql.Result, error) {
193	//log.Println("Run")
194	stmt.conn.lock()
195	res, err := stmt.Stmt.Run(params...)
196	// Unlock if error or OK result (which doesn't provide any fields)
197	if err != nil {
198		stmt.conn.unlock()
199		return nil, err
200	}
201	if res.StatusOnly() && !res.MoreResults() {
202		stmt.conn.unlock()
203	}
204	return &Result{Result: res, conn: stmt.conn}, nil
205}
206
207func (stmt *Stmt) Delete() error {
208	//log.Println("Delete")
209	stmt.conn.lock()
210	defer stmt.conn.unlock()
211	return stmt.Stmt.Delete()
212}
213
214func (stmt *Stmt) Reset() error {
215	//log.Println("Reset")
216	stmt.conn.lock()
217	defer stmt.conn.unlock()
218	return stmt.Stmt.Reset()
219}
220
221func (stmt *Stmt) SendLongData(pnum int, data interface{}, pkt_size int) error {
222	//log.Println("SendLongData")
223	stmt.conn.lock()
224	defer stmt.conn.unlock()
225	return stmt.Stmt.SendLongData(pnum, data, pkt_size)
226}
227
228// See mysql.Query
229func (c *Conn) Query(sql string, params ...interface{}) ([]mysql.Row, mysql.Result, error) {
230	return mysql.Query(c, sql, params...)
231}
232
233// See mysql.QueryFirst
234func (my *Conn) QueryFirst(sql string, params ...interface{}) (mysql.Row, mysql.Result, error) {
235	return mysql.QueryFirst(my, sql, params...)
236}
237
238// See mysql.QueryLast
239func (my *Conn) QueryLast(sql string, params ...interface{}) (mysql.Row, mysql.Result, error) {
240	return mysql.QueryLast(my, sql, params...)
241}
242
243// See mysql.Exec
244func (stmt *Stmt) Exec(params ...interface{}) ([]mysql.Row, mysql.Result, error) {
245	return mysql.Exec(stmt, params...)
246}
247
248// See mysql.ExecFirst
249func (stmt *Stmt) ExecFirst(params ...interface{}) (mysql.Row, mysql.Result, error) {
250	return mysql.ExecFirst(stmt, params...)
251}
252
253// See mysql.ExecLast
254func (stmt *Stmt) ExecLast(params ...interface{}) (mysql.Row, mysql.Result, error) {
255	return mysql.ExecLast(stmt, params...)
256}
257
258// See mysql.End
259func (res *Result) End() error {
260	return mysql.End(res)
261}
262
263// See mysql.GetFirstRow
264func (res *Result) GetFirstRow() (mysql.Row, error) {
265	return mysql.GetFirstRow(res)
266}
267
268// See mysql.GetLastRow
269func (res *Result) GetLastRow() (mysql.Row, error) {
270	return mysql.GetLastRow(res)
271}
272
273// See mysql.GetRows
274func (res *Result) GetRows() ([]mysql.Row, error) {
275	return mysql.GetRows(res)
276}
277
278// Begins a new transaction. No any other thread can send command on this
279// connection until Commit or Rollback will be called.
280// Periodical pinging the server is disabled during transaction.
281
282func (c *Conn) Begin() (mysql.Transaction, error) {
283	//log.Println("Begin")
284	c.lock()
285	tr := Transaction{
286		&Conn{Conn: c.Conn, mutex: new(sync.Mutex)},
287		c,
288	}
289	_, err := c.Conn.Start("START TRANSACTION")
290	if err != nil {
291		c.unlock()
292		return nil, err
293	}
294	return &tr, nil
295}
296
297func (tr *Transaction) end(cr string) error {
298	tr.lock()
299	_, err := tr.conn.Conn.Start(cr)
300	tr.conn.unlock()
301	// Invalidate this transaction
302	m := tr.Conn.mutex
303	tr.Conn = nil
304	tr.conn = nil
305	m.Unlock() // One goorutine which still uses this transaction will panic
306	return err
307}
308
309func (tr *Transaction) Commit() error {
310	//log.Println("Commit")
311	return tr.end("COMMIT")
312}
313
314func (tr *Transaction) Rollback() error {
315	//log.Println("Rollback")
316	return tr.end("ROLLBACK")
317}
318
319func (tr *Transaction) IsValid() bool {
320	return tr.Conn != nil
321}
322
323func (tr *Transaction) Do(st mysql.Stmt) mysql.Stmt {
324	if s, ok := st.(*Stmt); ok && s.conn == tr.conn {
325		// Returns new statement which uses statement mutexes
326		return &Stmt{s.Stmt, tr.Conn}
327	}
328	panic("Transaction and statement doesn't belong to the same connection")
329}
330
331var orgNew func(proto, laddr, raddr, user, passwd string, db ...string) mysql.Conn
332
333func init() {
334	orgNew = mysql.New
335	mysql.New = New
336}
337