1package zk
2
3/*
4TODO:
5* make sure a ping response comes back in a reasonable time
6
7Possible watcher events:
8* Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
9*/
10
11import (
12	"crypto/rand"
13	"encoding/binary"
14	"errors"
15	"fmt"
16	"io"
17	"log"
18	"net"
19	"strconv"
20	"strings"
21	"sync"
22	"sync/atomic"
23	"time"
24)
25
26var ErrNoServer = errors.New("zk: could not connect to a server")
27
28const (
29	bufferSize      = 1536 * 1024
30	eventChanSize   = 6
31	sendChanSize    = 16
32	protectedPrefix = "_c_"
33)
34
35type watchType int
36
37const (
38	watchTypeData  = iota
39	watchTypeExist = iota
40	watchTypeChild = iota
41)
42
43type watchPathType struct {
44	path  string
45	wType watchType
46}
47
48type Dialer func(network, address string, timeout time.Duration) (net.Conn, error)
49
50type Conn struct {
51	lastZxid  int64
52	sessionID int64
53	state     State // must be 32-bit aligned
54	xid       uint32
55	timeout   int32 // session timeout in milliseconds
56	passwd    []byte
57
58	dialer          Dialer
59	servers         []string
60	serverIndex     int // remember last server that was tried during connect to round-robin attempts to servers
61	lastServerIndex int // index of the last server that was successfully connected to and authenticated with
62	conn            net.Conn
63	eventChan       chan Event
64	shouldQuit      chan struct{}
65	pingInterval    time.Duration
66	recvTimeout     time.Duration
67	connectTimeout  time.Duration
68
69	sendChan     chan *request
70	requests     map[int32]*request // Xid -> pending request
71	requestsLock sync.Mutex
72	watchers     map[watchPathType][]chan Event
73	watchersLock sync.Mutex
74
75	// Debug (used by unit tests)
76	reconnectDelay time.Duration
77}
78
79type request struct {
80	xid        int32
81	opcode     int32
82	pkt        interface{}
83	recvStruct interface{}
84	recvChan   chan response
85
86	// Because sending and receiving happen in separate go routines, there's
87	// a possible race condition when creating watches from outside the read
88	// loop. We must ensure that a watcher gets added to the list synchronously
89	// with the response from the server on any request that creates a watch.
90	// In order to not hard code the watch logic for each opcode in the recv
91	// loop the caller can use recvFunc to insert some synchronously code
92	// after a response.
93	recvFunc func(*request, *responseHeader, error)
94}
95
96type response struct {
97	zxid int64
98	err  error
99}
100
101type Event struct {
102	Type   EventType
103	State  State
104	Path   string // For non-session events, the path of the watched node.
105	Err    error
106	Server string // For connection events
107}
108
109// Connect establishes a new connection to a pool of zookeeper servers
110// using the default net.Dialer. See ConnectWithDialer for further
111// information about session timeout.
112func Connect(servers []string, sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
113	return ConnectWithDialer(servers, sessionTimeout, nil)
114}
115
116// ConnectWithDialer establishes a new connection to a pool of zookeeper
117// servers. The provided session timeout sets the amount of time for which
118// a session is considered valid after losing connection to a server. Within
119// the session timeout it's possible to reestablish a connection to a different
120// server and keep the same session. This is means any ephemeral nodes and
121// watches are maintained.
122func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) {
123	if len(servers) == 0 {
124		return nil, nil, errors.New("zk: server list must not be empty")
125	}
126
127	recvTimeout := sessionTimeout * 2 / 3
128
129	srvs := make([]string, len(servers))
130
131	for i, addr := range servers {
132		if strings.Contains(addr, ":") {
133			srvs[i] = addr
134		} else {
135			srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
136		}
137	}
138
139	// Randomize the order of the servers to avoid creating hotspots
140	stringShuffle(srvs)
141
142	ec := make(chan Event, eventChanSize)
143	if dialer == nil {
144		dialer = net.DialTimeout
145	}
146	conn := Conn{
147		dialer:          dialer,
148		servers:         srvs,
149		serverIndex:     0,
150		lastServerIndex: -1,
151		conn:            nil,
152		state:           StateDisconnected,
153		eventChan:       ec,
154		shouldQuit:      make(chan struct{}),
155		recvTimeout:     recvTimeout,
156		pingInterval:    recvTimeout / 2,
157		connectTimeout:  1 * time.Second,
158		sendChan:        make(chan *request, sendChanSize),
159		requests:        make(map[int32]*request),
160		watchers:        make(map[watchPathType][]chan Event),
161		passwd:          emptyPassword,
162		timeout:         int32(sessionTimeout.Nanoseconds() / 1e6),
163
164		// Debug
165		reconnectDelay: 0,
166	}
167	go func() {
168		conn.loop()
169		conn.flushRequests(ErrClosing)
170		conn.invalidateWatches(ErrClosing)
171		close(conn.eventChan)
172	}()
173	return &conn, ec, nil
174}
175
176func (c *Conn) Close() {
177	close(c.shouldQuit)
178
179	select {
180	case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
181	case <-time.After(time.Second):
182	}
183}
184
185func (c *Conn) State() State {
186	return State(atomic.LoadInt32((*int32)(&c.state)))
187}
188
189func (c *Conn) setState(state State) {
190	atomic.StoreInt32((*int32)(&c.state), int32(state))
191	select {
192	case c.eventChan <- Event{Type: EventSession, State: state, Server: c.servers[c.serverIndex]}:
193	default:
194		// panic("zk: event channel full - it must be monitored and never allowed to be full")
195	}
196}
197
198func (c *Conn) connect() error {
199	c.setState(StateConnecting)
200	for {
201		c.serverIndex = (c.serverIndex + 1) % len(c.servers)
202		if c.serverIndex == c.lastServerIndex {
203			c.flushUnsentRequests(ErrNoServer)
204			select {
205			case <-time.After(time.Second):
206				// pass
207			case <-c.shouldQuit:
208				c.setState(StateDisconnected)
209				c.flushUnsentRequests(ErrClosing)
210				return ErrClosing
211			}
212		} else if c.lastServerIndex < 0 {
213			// lastServerIndex defaults to -1 to avoid a delay on the initial connect
214			c.lastServerIndex = 0
215		}
216
217		zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout)
218		if err == nil {
219			c.conn = zkConn
220			c.setState(StateConnected)
221			return nil
222		}
223
224		log.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err)
225	}
226}
227
228func (c *Conn) loop() {
229	for {
230		if err := c.connect(); err != nil {
231			// c.Close() was called
232			return
233		}
234
235		err := c.authenticate()
236		switch {
237		case err == ErrSessionExpired:
238			c.invalidateWatches(err)
239		case err != nil && c.conn != nil:
240			c.conn.Close()
241		case err == nil:
242			c.lastServerIndex = c.serverIndex
243			closeChan := make(chan struct{}) // channel to tell send loop stop
244			var wg sync.WaitGroup
245
246			wg.Add(1)
247			go func() {
248				c.sendLoop(c.conn, closeChan)
249				c.conn.Close() // causes recv loop to EOF/exit
250				wg.Done()
251			}()
252
253			wg.Add(1)
254			go func() {
255				err = c.recvLoop(c.conn)
256				if err == nil {
257					panic("zk: recvLoop should never return nil error")
258				}
259				close(closeChan) // tell send loop to exit
260				wg.Done()
261			}()
262
263			wg.Wait()
264		}
265
266		c.setState(StateDisconnected)
267
268		// Yeesh
269		if err != io.EOF && err != ErrSessionExpired && !strings.Contains(err.Error(), "use of closed network connection") {
270			log.Println(err)
271		}
272
273		select {
274		case <-c.shouldQuit:
275			c.flushRequests(ErrClosing)
276			return
277		default:
278		}
279
280		if err != ErrSessionExpired {
281			err = ErrConnectionClosed
282		}
283		c.flushRequests(err)
284
285		if c.reconnectDelay > 0 {
286			select {
287			case <-c.shouldQuit:
288				return
289			case <-time.After(c.reconnectDelay):
290			}
291		}
292	}
293}
294
295func (c *Conn) flushUnsentRequests(err error) {
296	for {
297		select {
298		default:
299			return
300		case req := <-c.sendChan:
301			req.recvChan <- response{-1, err}
302		}
303	}
304}
305
306// Send error to all pending requests and clear request map
307func (c *Conn) flushRequests(err error) {
308	c.requestsLock.Lock()
309	for _, req := range c.requests {
310		req.recvChan <- response{-1, err}
311	}
312	c.requests = make(map[int32]*request)
313	c.requestsLock.Unlock()
314}
315
316// Send error to all watchers and clear watchers map
317func (c *Conn) invalidateWatches(err error) {
318	c.watchersLock.Lock()
319	defer c.watchersLock.Unlock()
320
321	if len(c.watchers) >= 0 {
322		for pathType, watchers := range c.watchers {
323			ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
324			for _, ch := range watchers {
325				ch <- ev
326				close(ch)
327			}
328		}
329		c.watchers = make(map[watchPathType][]chan Event)
330	}
331}
332
333func (c *Conn) sendSetWatches() {
334	c.watchersLock.Lock()
335	defer c.watchersLock.Unlock()
336
337	if len(c.watchers) == 0 {
338		return
339	}
340
341	req := &setWatchesRequest{
342		RelativeZxid: c.lastZxid,
343		DataWatches:  make([]string, 0),
344		ExistWatches: make([]string, 0),
345		ChildWatches: make([]string, 0),
346	}
347	n := 0
348	for pathType, watchers := range c.watchers {
349		if len(watchers) == 0 {
350			continue
351		}
352		switch pathType.wType {
353		case watchTypeData:
354			req.DataWatches = append(req.DataWatches, pathType.path)
355		case watchTypeExist:
356			req.ExistWatches = append(req.ExistWatches, pathType.path)
357		case watchTypeChild:
358			req.ChildWatches = append(req.ChildWatches, pathType.path)
359		}
360		n++
361	}
362	if n == 0 {
363		return
364	}
365
366	go func() {
367		res := &setWatchesResponse{}
368		_, err := c.request(opSetWatches, req, res, nil)
369		if err != nil {
370			log.Printf("Failed to set previous watches: %s", err.Error())
371		}
372	}()
373}
374
375func (c *Conn) authenticate() error {
376	buf := make([]byte, 256)
377
378	// connect request
379
380	n, err := encodePacket(buf[4:], &connectRequest{
381		ProtocolVersion: protocolVersion,
382		LastZxidSeen:    c.lastZxid,
383		TimeOut:         c.timeout,
384		SessionID:       c.sessionID,
385		Passwd:          c.passwd,
386	})
387	if err != nil {
388		return err
389	}
390
391	binary.BigEndian.PutUint32(buf[:4], uint32(n))
392
393	c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
394	_, err = c.conn.Write(buf[:n+4])
395	c.conn.SetWriteDeadline(time.Time{})
396	if err != nil {
397		return err
398	}
399
400	c.sendSetWatches()
401
402	// connect response
403
404	// package length
405	c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
406	_, err = io.ReadFull(c.conn, buf[:4])
407	c.conn.SetReadDeadline(time.Time{})
408	if err != nil {
409		// Sometimes zookeeper just drops connection on invalid session data,
410		// we prefer to drop session and start from scratch when that event
411		// occurs instead of dropping into loop of connect/disconnect attempts
412		c.sessionID = 0
413		c.passwd = emptyPassword
414		c.lastZxid = 0
415		c.setState(StateExpired)
416		return ErrSessionExpired
417	}
418
419	blen := int(binary.BigEndian.Uint32(buf[:4]))
420	if cap(buf) < blen {
421		buf = make([]byte, blen)
422	}
423
424	_, err = io.ReadFull(c.conn, buf[:blen])
425	if err != nil {
426		return err
427	}
428
429	r := connectResponse{}
430	_, err = decodePacket(buf[:blen], &r)
431	if err != nil {
432		return err
433	}
434	if r.SessionID == 0 {
435		c.sessionID = 0
436		c.passwd = emptyPassword
437		c.lastZxid = 0
438		c.setState(StateExpired)
439		return ErrSessionExpired
440	}
441
442	if c.sessionID != r.SessionID {
443		atomic.StoreUint32(&c.xid, 0)
444	}
445	c.timeout = r.TimeOut
446	c.sessionID = r.SessionID
447	c.passwd = r.Passwd
448	c.setState(StateHasSession)
449
450	return nil
451}
452
453func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error {
454	pingTicker := time.NewTicker(c.pingInterval)
455	defer pingTicker.Stop()
456
457	buf := make([]byte, bufferSize)
458	for {
459		select {
460		case req := <-c.sendChan:
461			header := &requestHeader{req.xid, req.opcode}
462			n, err := encodePacket(buf[4:], header)
463			if err != nil {
464				req.recvChan <- response{-1, err}
465				continue
466			}
467
468			n2, err := encodePacket(buf[4+n:], req.pkt)
469			if err != nil {
470				req.recvChan <- response{-1, err}
471				continue
472			}
473
474			n += n2
475
476			binary.BigEndian.PutUint32(buf[:4], uint32(n))
477
478			c.requestsLock.Lock()
479			select {
480			case <-closeChan:
481				req.recvChan <- response{-1, ErrConnectionClosed}
482				c.requestsLock.Unlock()
483				return ErrConnectionClosed
484			default:
485			}
486			c.requests[req.xid] = req
487			c.requestsLock.Unlock()
488
489			conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
490			_, err = conn.Write(buf[:n+4])
491			conn.SetWriteDeadline(time.Time{})
492			if err != nil {
493				req.recvChan <- response{-1, err}
494				conn.Close()
495				return err
496			}
497		case <-pingTicker.C:
498			n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
499			if err != nil {
500				panic("zk: opPing should never fail to serialize")
501			}
502
503			binary.BigEndian.PutUint32(buf[:4], uint32(n))
504
505			conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
506			_, err = conn.Write(buf[:n+4])
507			conn.SetWriteDeadline(time.Time{})
508			if err != nil {
509				conn.Close()
510				return err
511			}
512		case <-closeChan:
513			return nil
514		}
515	}
516}
517
518func (c *Conn) recvLoop(conn net.Conn) error {
519	buf := make([]byte, bufferSize)
520	for {
521		// package length
522		conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
523		_, err := io.ReadFull(conn, buf[:4])
524		if err != nil {
525			return err
526		}
527
528		blen := int(binary.BigEndian.Uint32(buf[:4]))
529		if cap(buf) < blen {
530			buf = make([]byte, blen)
531		}
532
533		_, err = io.ReadFull(conn, buf[:blen])
534		conn.SetReadDeadline(time.Time{})
535		if err != nil {
536			return err
537		}
538
539		res := responseHeader{}
540		_, err = decodePacket(buf[:16], &res)
541		if err != nil {
542			return err
543		}
544
545		if res.Xid == -1 {
546			res := &watcherEvent{}
547			_, err := decodePacket(buf[16:16+blen], res)
548			if err != nil {
549				return err
550			}
551			ev := Event{
552				Type:  res.Type,
553				State: res.State,
554				Path:  res.Path,
555				Err:   nil,
556			}
557			select {
558			case c.eventChan <- ev:
559			default:
560			}
561			wTypes := make([]watchType, 0, 2)
562			switch res.Type {
563			case EventNodeCreated:
564				wTypes = append(wTypes, watchTypeExist)
565			case EventNodeDeleted, EventNodeDataChanged:
566				wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
567			case EventNodeChildrenChanged:
568				wTypes = append(wTypes, watchTypeChild)
569			}
570			c.watchersLock.Lock()
571			for _, t := range wTypes {
572				wpt := watchPathType{res.Path, t}
573				if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
574					for _, ch := range watchers {
575						ch <- ev
576						close(ch)
577					}
578					delete(c.watchers, wpt)
579				}
580			}
581			c.watchersLock.Unlock()
582		} else if res.Xid == -2 {
583			// Ping response. Ignore.
584		} else if res.Xid < 0 {
585			log.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
586		} else {
587			if res.Zxid > 0 {
588				c.lastZxid = res.Zxid
589			}
590
591			c.requestsLock.Lock()
592			req, ok := c.requests[res.Xid]
593			if ok {
594				delete(c.requests, res.Xid)
595			}
596			c.requestsLock.Unlock()
597
598			if !ok {
599				log.Printf("Response for unknown request with xid %d", res.Xid)
600			} else {
601				if res.Err != 0 {
602					err = res.Err.toError()
603				} else {
604					_, err = decodePacket(buf[16:16+blen], req.recvStruct)
605				}
606				if req.recvFunc != nil {
607					req.recvFunc(req, &res, err)
608				}
609				req.recvChan <- response{res.Zxid, err}
610				if req.opcode == opClose {
611					return io.EOF
612				}
613			}
614		}
615	}
616}
617
618func (c *Conn) nextXid() int32 {
619	return int32(atomic.AddUint32(&c.xid, 1) & 0x7fffffff)
620}
621
622func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
623	c.watchersLock.Lock()
624	defer c.watchersLock.Unlock()
625
626	ch := make(chan Event, 1)
627	wpt := watchPathType{path, watchType}
628	c.watchers[wpt] = append(c.watchers[wpt], ch)
629	return ch
630}
631
632func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
633	rq := &request{
634		xid:        c.nextXid(),
635		opcode:     opcode,
636		pkt:        req,
637		recvStruct: res,
638		recvChan:   make(chan response, 1),
639		recvFunc:   recvFunc,
640	}
641	c.sendChan <- rq
642	return rq.recvChan
643}
644
645func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
646	r := <-c.queueRequest(opcode, req, res, recvFunc)
647	return r.zxid, r.err
648}
649
650func (c *Conn) AddAuth(scheme string, auth []byte) error {
651	_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
652	return err
653}
654
655func (c *Conn) Children(path string) ([]string, *Stat, error) {
656	res := &getChildren2Response{}
657	_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
658	return res.Children, &res.Stat, err
659}
660
661func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
662	var ech <-chan Event
663	res := &getChildren2Response{}
664	_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
665		if err == nil {
666			ech = c.addWatcher(path, watchTypeChild)
667		}
668	})
669	if err != nil {
670		return nil, nil, nil, err
671	}
672	return res.Children, &res.Stat, ech, err
673}
674
675func (c *Conn) Get(path string) ([]byte, *Stat, error) {
676	res := &getDataResponse{}
677	_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
678	return res.Data, &res.Stat, err
679}
680
681// GetW returns the contents of a znode and sets a watch
682func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
683	var ech <-chan Event
684	res := &getDataResponse{}
685	_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
686		if err == nil {
687			ech = c.addWatcher(path, watchTypeData)
688		}
689	})
690	if err != nil {
691		return nil, nil, nil, err
692	}
693	return res.Data, &res.Stat, ech, err
694}
695
696func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
697	res := &setDataResponse{}
698	_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
699	return &res.Stat, err
700}
701
702func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
703	res := &createResponse{}
704	_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
705	return res.Path, err
706}
707
708// CreateProtectedEphemeralSequential fixes a race condition if the server crashes
709// after it creates the node. On reconnect the session may still be valid so the
710// ephemeral node still exists. Therefore, on reconnect we need to check if a node
711// with a GUID generated on create exists.
712func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) {
713	var guid [16]byte
714	_, err := io.ReadFull(rand.Reader, guid[:16])
715	if err != nil {
716		return "", err
717	}
718	guidStr := fmt.Sprintf("%x", guid)
719
720	parts := strings.Split(path, "/")
721	parts[len(parts)-1] = fmt.Sprintf("%s%s-%s", protectedPrefix, guidStr, parts[len(parts)-1])
722	rootPath := strings.Join(parts[:len(parts)-1], "/")
723	protectedPath := strings.Join(parts, "/")
724
725	var newPath string
726	for i := 0; i < 3; i++ {
727		newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl)
728		switch err {
729		case ErrSessionExpired:
730			// No need to search for the node since it can't exist. Just try again.
731		case ErrConnectionClosed:
732			children, _, err := c.Children(rootPath)
733			if err != nil {
734				return "", err
735			}
736			for _, p := range children {
737				parts := strings.Split(p, "/")
738				if pth := parts[len(parts)-1]; strings.HasPrefix(pth, protectedPrefix) {
739					if g := pth[len(protectedPrefix) : len(protectedPrefix)+32]; g == guidStr {
740						return rootPath + "/" + p, nil
741					}
742				}
743			}
744		case nil:
745			return newPath, nil
746		default:
747			return "", err
748		}
749	}
750	return "", err
751}
752
753func (c *Conn) Delete(path string, version int32) error {
754	_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
755	return err
756}
757
758func (c *Conn) Exists(path string) (bool, *Stat, error) {
759	res := &existsResponse{}
760	_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
761	exists := true
762	if err == ErrNoNode {
763		exists = false
764		err = nil
765	}
766	return exists, &res.Stat, err
767}
768
769func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
770	var ech <-chan Event
771	res := &existsResponse{}
772	_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
773		if err == nil {
774			ech = c.addWatcher(path, watchTypeData)
775		} else if err == ErrNoNode {
776			ech = c.addWatcher(path, watchTypeExist)
777		}
778	})
779	exists := true
780	if err == ErrNoNode {
781		exists = false
782		err = nil
783	}
784	if err != nil {
785		return false, nil, nil, err
786	}
787	return exists, &res.Stat, ech, err
788}
789
790func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
791	res := &getAclResponse{}
792	_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
793	return res.Acl, &res.Stat, err
794}
795
796func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
797	res := &setAclResponse{}
798	_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
799	return &res.Stat, err
800}
801
802func (c *Conn) Sync(path string) (string, error) {
803	res := &syncResponse{}
804	_, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
805	return res.Path, err
806}
807
808type MultiResponse struct {
809	Stat   *Stat
810	String string
811}
812
813// Multi executes multiple ZooKeeper operations or none of them. The provided
814// ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or
815// *CheckVersionRequest.
816func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
817	req := &multiRequest{
818		Ops:        make([]multiRequestOp, 0, len(ops)),
819		DoneHeader: multiHeader{Type: -1, Done: true, Err: -1},
820	}
821	for _, op := range ops {
822		var opCode int32
823		switch op.(type) {
824		case *CreateRequest:
825			opCode = opCreate
826		case *SetDataRequest:
827			opCode = opSetData
828		case *DeleteRequest:
829			opCode = opDelete
830		case *CheckVersionRequest:
831			opCode = opCheck
832		default:
833			return nil, fmt.Errorf("uknown operation type %T", op)
834		}
835		req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCode, false, -1}, op})
836	}
837	res := &multiResponse{}
838	_, err := c.request(opMulti, req, res, nil)
839	mr := make([]MultiResponse, len(res.Ops))
840	for i, op := range res.Ops {
841		mr[i] = MultiResponse{Stat: op.Stat, String: op.String}
842	}
843	return mr, err
844}
845