1package beanstalk 2 3import ( 4 "fmt" 5 "io" 6 "net" 7 "net/textproto" 8 "strings" 9 "time" 10) 11 12// DefaultDialTimeout is the time to wait for a connection to the beanstalk server. 13const DefaultDialTimeout = 10 * time.Second 14 15// DefaultKeepAlivePeriod is the default period between TCP keepalive messages. 16const DefaultKeepAlivePeriod = 10 * time.Second 17 18// A Conn represents a connection to a beanstalkd server. It consists 19// of a default Tube and TubeSet as well as the underlying network 20// connection. The embedded types carry methods with them; see the 21// documentation of those types for details. 22type Conn struct { 23 c *textproto.Conn 24 used string 25 watched map[string]bool 26 Tube 27 TubeSet 28} 29 30var ( 31 space = []byte{' '} 32 crnl = []byte{'\r', '\n'} 33 yamlHead = []byte{'-', '-', '-', '\n'} 34 nl = []byte{'\n'} 35 colonSpace = []byte{':', ' '} 36 minusSpace = []byte{'-', ' '} 37) 38 39// NewConn returns a new Conn using conn for I/O. 40func NewConn(conn io.ReadWriteCloser) *Conn { 41 c := new(Conn) 42 c.c = textproto.NewConn(conn) 43 c.Tube = *NewTube(c, "default") 44 c.TubeSet = *NewTubeSet(c, "default") 45 c.used = "default" 46 c.watched = map[string]bool{"default": true} 47 return c 48} 49 50// Dial connects addr on the given network using net.DialTimeout 51// with a default timeout of 10s and then returns a new Conn for the connection. 52func Dial(network, addr string) (*Conn, error) { 53 return DialTimeout(network, addr, DefaultDialTimeout) 54} 55 56// DialTimeout connects addr on the given network using net.DialTimeout 57// with a supplied timeout and then returns a new Conn for the connection. 58func DialTimeout(network, addr string, timeout time.Duration) (*Conn, error) { 59 dialer := &net.Dialer{ 60 Timeout: timeout, 61 KeepAlive: DefaultKeepAlivePeriod, 62 } 63 c, err := dialer.Dial(network, addr) 64 if err != nil { 65 return nil, err 66 } 67 return NewConn(c), nil 68} 69 70// Close closes the underlying network connection. 71func (c *Conn) Close() error { 72 return c.c.Close() 73} 74 75func (c *Conn) cmd(t *Tube, ts *TubeSet, body []byte, op string, args ...interface{}) (req, error) { 76 // negative dur checking 77 for _, arg := range args { 78 if d, _ := arg.(dur); d < 0 { 79 return req{}, fmt.Errorf("duration must be non-negative, got %v", time.Duration(d)) 80 } 81 } 82 83 r := req{c.c.Next(), op} 84 c.c.StartRequest(r.id) 85 defer c.c.EndRequest(r.id) 86 err := c.adjustTubes(t, ts) 87 if err != nil { 88 return req{}, err 89 } 90 if body != nil { 91 args = append(args, len(body)) 92 } 93 c.printLine(op, args...) 94 if body != nil { 95 c.c.W.Write(body) 96 c.c.W.Write(crnl) 97 } 98 err = c.c.W.Flush() 99 if err != nil { 100 return req{}, ConnError{c, op, err} 101 } 102 return r, nil 103} 104 105func (c *Conn) adjustTubes(t *Tube, ts *TubeSet) error { 106 if t != nil && t.Name != c.used { 107 if err := checkName(t.Name); err != nil { 108 return err 109 } 110 c.printLine("use", t.Name) 111 c.used = t.Name 112 } 113 if ts != nil { 114 for s := range ts.Name { 115 if !c.watched[s] { 116 if err := checkName(s); err != nil { 117 return err 118 } 119 c.printLine("watch", s) 120 } 121 } 122 for s := range c.watched { 123 if !ts.Name[s] { 124 c.printLine("ignore", s) 125 } 126 } 127 c.watched = make(map[string]bool) 128 for s := range ts.Name { 129 c.watched[s] = true 130 } 131 } 132 return nil 133} 134 135// does not flush 136func (c *Conn) printLine(cmd string, args ...interface{}) { 137 io.WriteString(c.c.W, cmd) 138 for _, a := range args { 139 c.c.W.Write(space) 140 fmt.Fprint(c.c.W, a) 141 } 142 c.c.W.Write(crnl) 143} 144 145func (c *Conn) readResp(r req, readBody bool, f string, a ...interface{}) (body []byte, err error) { 146 c.c.StartResponse(r.id) 147 defer c.c.EndResponse(r.id) 148 line, err := c.c.ReadLine() 149 for strings.HasPrefix(line, "WATCHING ") || strings.HasPrefix(line, "USING ") { 150 line, err = c.c.ReadLine() 151 } 152 if err != nil { 153 return nil, ConnError{c, r.op, err} 154 } 155 toScan := line 156 if readBody { 157 var size int 158 toScan, size, err = parseSize(toScan) 159 if err != nil { 160 return nil, ConnError{c, r.op, err} 161 } 162 body = make([]byte, size+2) // include trailing CR NL 163 _, err = io.ReadFull(c.c.R, body) 164 if err != nil { 165 return nil, ConnError{c, r.op, err} 166 } 167 body = body[:size] // exclude trailing CR NL 168 } 169 170 err = scan(toScan, f, a...) 171 if err != nil { 172 return nil, ConnError{c, r.op, err} 173 } 174 return body, nil 175} 176 177// Delete deletes the given job. 178func (c *Conn) Delete(id uint64) error { 179 r, err := c.cmd(nil, nil, nil, "delete", id) 180 if err != nil { 181 return err 182 } 183 _, err = c.readResp(r, false, "DELETED") 184 return err 185} 186 187// Release tells the server to perform the following actions: 188// set the priority of the given job to pri, remove it from the list of 189// jobs reserved by c, wait delay seconds, then place the job in the 190// ready queue, which makes it available for reservation by any client. 191func (c *Conn) Release(id uint64, pri uint32, delay time.Duration) error { 192 r, err := c.cmd(nil, nil, nil, "release", id, pri, dur(delay)) 193 if err != nil { 194 return err 195 } 196 _, err = c.readResp(r, false, "RELEASED") 197 return err 198} 199 200// Bury places the given job in a holding area in the job's tube and 201// sets its priority to pri. The job will not be scheduled again until it 202// has been kicked; see also the documentation of Kick. 203func (c *Conn) Bury(id uint64, pri uint32) error { 204 r, err := c.cmd(nil, nil, nil, "bury", id, pri) 205 if err != nil { 206 return err 207 } 208 _, err = c.readResp(r, false, "BURIED") 209 return err 210} 211 212// KickJob places the given job to the ready queue of the same tube where it currently belongs 213// when the given job id exists and is in a buried or delayed state. 214func (c *Conn) KickJob(id uint64) error { 215 r, err := c.cmd(nil, nil, nil, "kick-job", id) 216 if err != nil { 217 return err 218 } 219 _, err = c.readResp(r, false, "KICKED") 220 return err 221} 222 223// Touch resets the reservation timer for the given job. 224// It is an error if the job isn't currently reserved by c. 225// See the documentation of Reserve for more details. 226func (c *Conn) Touch(id uint64) error { 227 r, err := c.cmd(nil, nil, nil, "touch", id) 228 if err != nil { 229 return err 230 } 231 _, err = c.readResp(r, false, "TOUCHED") 232 return err 233} 234 235// Peek gets a copy of the specified job from the server. 236func (c *Conn) Peek(id uint64) (body []byte, err error) { 237 r, err := c.cmd(nil, nil, nil, "peek", id) 238 if err != nil { 239 return nil, err 240 } 241 return c.readResp(r, true, "FOUND %d", &id) 242} 243 244// Stats retrieves global statistics from the server. 245func (c *Conn) Stats() (map[string]string, error) { 246 r, err := c.cmd(nil, nil, nil, "stats") 247 if err != nil { 248 return nil, err 249 } 250 body, err := c.readResp(r, true, "OK") 251 return parseDict(body), err 252} 253 254// StatsJob retrieves statistics about the given job. 255func (c *Conn) StatsJob(id uint64) (map[string]string, error) { 256 r, err := c.cmd(nil, nil, nil, "stats-job", id) 257 if err != nil { 258 return nil, err 259 } 260 body, err := c.readResp(r, true, "OK") 261 return parseDict(body), err 262} 263 264// ListTubes returns the names of the tubes that currently 265// exist on the server. 266func (c *Conn) ListTubes() ([]string, error) { 267 r, err := c.cmd(nil, nil, nil, "list-tubes") 268 if err != nil { 269 return nil, err 270 } 271 body, err := c.readResp(r, true, "OK") 272 return parseList(body), err 273} 274 275func scan(input, format string, a ...interface{}) error { 276 _, err := fmt.Sscanf(input, format, a...) 277 if err != nil { 278 return findRespError(input) 279 } 280 return nil 281} 282 283type req struct { 284 id uint 285 op string 286} 287