1package beanstalk
2
3import (
4	"fmt"
5	"io"
6	"net"
7	"net/textproto"
8	"strings"
9	"time"
10)
11
12// DefaultDialTimeout is the time to wait for a connection to the beanstalk server.
13const DefaultDialTimeout = 10 * time.Second
14
15// DefaultKeepAlivePeriod is the default period between TCP keepalive messages.
16const DefaultKeepAlivePeriod = 10 * time.Second
17
18// A Conn represents a connection to a beanstalkd server. It consists
19// of a default Tube and TubeSet as well as the underlying network
20// connection. The embedded types carry methods with them; see the
21// documentation of those types for details.
22type Conn struct {
23	c       *textproto.Conn
24	used    string
25	watched map[string]bool
26	Tube
27	TubeSet
28}
29
30var (
31	space      = []byte{' '}
32	crnl       = []byte{'\r', '\n'}
33	yamlHead   = []byte{'-', '-', '-', '\n'}
34	nl         = []byte{'\n'}
35	colonSpace = []byte{':', ' '}
36	minusSpace = []byte{'-', ' '}
37)
38
39// NewConn returns a new Conn using conn for I/O.
40func NewConn(conn io.ReadWriteCloser) *Conn {
41	c := new(Conn)
42	c.c = textproto.NewConn(conn)
43	c.Tube = *NewTube(c, "default")
44	c.TubeSet = *NewTubeSet(c, "default")
45	c.used = "default"
46	c.watched = map[string]bool{"default": true}
47	return c
48}
49
50// Dial connects addr on the given network using net.DialTimeout
51// with a default timeout of 10s and then returns a new Conn for the connection.
52func Dial(network, addr string) (*Conn, error) {
53	return DialTimeout(network, addr, DefaultDialTimeout)
54}
55
56// DialTimeout connects addr on the given network using net.DialTimeout
57// with a supplied timeout and then returns a new Conn for the connection.
58func DialTimeout(network, addr string, timeout time.Duration) (*Conn, error) {
59	dialer := &net.Dialer{
60		Timeout:   timeout,
61		KeepAlive: DefaultKeepAlivePeriod,
62	}
63	c, err := dialer.Dial(network, addr)
64	if err != nil {
65		return nil, err
66	}
67	return NewConn(c), nil
68}
69
70// Close closes the underlying network connection.
71func (c *Conn) Close() error {
72	return c.c.Close()
73}
74
75func (c *Conn) cmd(t *Tube, ts *TubeSet, body []byte, op string, args ...interface{}) (req, error) {
76	// negative dur checking
77	for _, arg := range args {
78		if d, _ := arg.(dur); d < 0 {
79			return req{}, fmt.Errorf("duration must be non-negative, got %v", time.Duration(d))
80		}
81	}
82
83	r := req{c.c.Next(), op}
84	c.c.StartRequest(r.id)
85	defer c.c.EndRequest(r.id)
86	err := c.adjustTubes(t, ts)
87	if err != nil {
88		return req{}, err
89	}
90	if body != nil {
91		args = append(args, len(body))
92	}
93	c.printLine(op, args...)
94	if body != nil {
95		c.c.W.Write(body)
96		c.c.W.Write(crnl)
97	}
98	err = c.c.W.Flush()
99	if err != nil {
100		return req{}, ConnError{c, op, err}
101	}
102	return r, nil
103}
104
105func (c *Conn) adjustTubes(t *Tube, ts *TubeSet) error {
106	if t != nil && t.Name != c.used {
107		if err := checkName(t.Name); err != nil {
108			return err
109		}
110		c.printLine("use", t.Name)
111		c.used = t.Name
112	}
113	if ts != nil {
114		for s := range ts.Name {
115			if !c.watched[s] {
116				if err := checkName(s); err != nil {
117					return err
118				}
119				c.printLine("watch", s)
120			}
121		}
122		for s := range c.watched {
123			if !ts.Name[s] {
124				c.printLine("ignore", s)
125			}
126		}
127		c.watched = make(map[string]bool)
128		for s := range ts.Name {
129			c.watched[s] = true
130		}
131	}
132	return nil
133}
134
135// does not flush
136func (c *Conn) printLine(cmd string, args ...interface{}) {
137	io.WriteString(c.c.W, cmd)
138	for _, a := range args {
139		c.c.W.Write(space)
140		fmt.Fprint(c.c.W, a)
141	}
142	c.c.W.Write(crnl)
143}
144
145func (c *Conn) readResp(r req, readBody bool, f string, a ...interface{}) (body []byte, err error) {
146	c.c.StartResponse(r.id)
147	defer c.c.EndResponse(r.id)
148	line, err := c.c.ReadLine()
149	for strings.HasPrefix(line, "WATCHING ") || strings.HasPrefix(line, "USING ") {
150		line, err = c.c.ReadLine()
151	}
152	if err != nil {
153		return nil, ConnError{c, r.op, err}
154	}
155	toScan := line
156	if readBody {
157		var size int
158		toScan, size, err = parseSize(toScan)
159		if err != nil {
160			return nil, ConnError{c, r.op, err}
161		}
162		body = make([]byte, size+2) // include trailing CR NL
163		_, err = io.ReadFull(c.c.R, body)
164		if err != nil {
165			return nil, ConnError{c, r.op, err}
166		}
167		body = body[:size] // exclude trailing CR NL
168	}
169
170	err = scan(toScan, f, a...)
171	if err != nil {
172		return nil, ConnError{c, r.op, err}
173	}
174	return body, nil
175}
176
177// Delete deletes the given job.
178func (c *Conn) Delete(id uint64) error {
179	r, err := c.cmd(nil, nil, nil, "delete", id)
180	if err != nil {
181		return err
182	}
183	_, err = c.readResp(r, false, "DELETED")
184	return err
185}
186
187// Release tells the server to perform the following actions:
188// set the priority of the given job to pri, remove it from the list of
189// jobs reserved by c, wait delay seconds, then place the job in the
190// ready queue, which makes it available for reservation by any client.
191func (c *Conn) Release(id uint64, pri uint32, delay time.Duration) error {
192	r, err := c.cmd(nil, nil, nil, "release", id, pri, dur(delay))
193	if err != nil {
194		return err
195	}
196	_, err = c.readResp(r, false, "RELEASED")
197	return err
198}
199
200// Bury places the given job in a holding area in the job's tube and
201// sets its priority to pri. The job will not be scheduled again until it
202// has been kicked; see also the documentation of Kick.
203func (c *Conn) Bury(id uint64, pri uint32) error {
204	r, err := c.cmd(nil, nil, nil, "bury", id, pri)
205	if err != nil {
206		return err
207	}
208	_, err = c.readResp(r, false, "BURIED")
209	return err
210}
211
212// KickJob places the given job to the ready queue of the same tube where it currently belongs
213// when the given job id exists and is in a buried or delayed state.
214func (c *Conn) KickJob(id uint64) error {
215	r, err := c.cmd(nil, nil, nil, "kick-job", id)
216	if err != nil {
217		return err
218	}
219	_, err = c.readResp(r, false, "KICKED")
220	return err
221}
222
223// Touch resets the reservation timer for the given job.
224// It is an error if the job isn't currently reserved by c.
225// See the documentation of Reserve for more details.
226func (c *Conn) Touch(id uint64) error {
227	r, err := c.cmd(nil, nil, nil, "touch", id)
228	if err != nil {
229		return err
230	}
231	_, err = c.readResp(r, false, "TOUCHED")
232	return err
233}
234
235// Peek gets a copy of the specified job from the server.
236func (c *Conn) Peek(id uint64) (body []byte, err error) {
237	r, err := c.cmd(nil, nil, nil, "peek", id)
238	if err != nil {
239		return nil, err
240	}
241	return c.readResp(r, true, "FOUND %d", &id)
242}
243
244// Stats retrieves global statistics from the server.
245func (c *Conn) Stats() (map[string]string, error) {
246	r, err := c.cmd(nil, nil, nil, "stats")
247	if err != nil {
248		return nil, err
249	}
250	body, err := c.readResp(r, true, "OK")
251	return parseDict(body), err
252}
253
254// StatsJob retrieves statistics about the given job.
255func (c *Conn) StatsJob(id uint64) (map[string]string, error) {
256	r, err := c.cmd(nil, nil, nil, "stats-job", id)
257	if err != nil {
258		return nil, err
259	}
260	body, err := c.readResp(r, true, "OK")
261	return parseDict(body), err
262}
263
264// ListTubes returns the names of the tubes that currently
265// exist on the server.
266func (c *Conn) ListTubes() ([]string, error) {
267	r, err := c.cmd(nil, nil, nil, "list-tubes")
268	if err != nil {
269		return nil, err
270	}
271	body, err := c.readResp(r, true, "OK")
272	return parseList(body), err
273}
274
275func scan(input, format string, a ...interface{}) error {
276	_, err := fmt.Sscanf(input, format, a...)
277	if err != nil {
278		return findRespError(input)
279	}
280	return nil
281}
282
283type req struct {
284	id uint
285	op string
286}
287