1package zk 2 3/* 4TODO: 5* make sure a ping response comes back in a reasonable time 6 7Possible watcher events: 8* Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err} 9*/ 10 11import ( 12 "crypto/rand" 13 "encoding/binary" 14 "errors" 15 "fmt" 16 "io" 17 "log" 18 "net" 19 "strconv" 20 "strings" 21 "sync" 22 "sync/atomic" 23 "time" 24) 25 26var ErrNoServer = errors.New("zk: could not connect to a server") 27 28const ( 29 bufferSize = 1536 * 1024 30 eventChanSize = 6 31 sendChanSize = 16 32 protectedPrefix = "_c_" 33) 34 35type watchType int 36 37const ( 38 watchTypeData = iota 39 watchTypeExist = iota 40 watchTypeChild = iota 41) 42 43type watchPathType struct { 44 path string 45 wType watchType 46} 47 48type Dialer func(network, address string, timeout time.Duration) (net.Conn, error) 49 50type Conn struct { 51 lastZxid int64 52 sessionID int64 53 state State // must be 32-bit aligned 54 xid uint32 55 timeout int32 // session timeout in milliseconds 56 passwd []byte 57 58 dialer Dialer 59 servers []string 60 serverIndex int // remember last server that was tried during connect to round-robin attempts to servers 61 lastServerIndex int // index of the last server that was successfully connected to and authenticated with 62 conn net.Conn 63 eventChan chan Event 64 shouldQuit chan struct{} 65 pingInterval time.Duration 66 recvTimeout time.Duration 67 connectTimeout time.Duration 68 69 sendChan chan *request 70 requests map[int32]*request // Xid -> pending request 71 requestsLock sync.Mutex 72 watchers map[watchPathType][]chan Event 73 watchersLock sync.Mutex 74 75 // Debug (used by unit tests) 76 reconnectDelay time.Duration 77} 78 79type request struct { 80 xid int32 81 opcode int32 82 pkt interface{} 83 recvStruct interface{} 84 recvChan chan response 85 86 // Because sending and receiving happen in separate go routines, there's 87 // a possible race condition when creating watches from outside the read 88 // loop. We must ensure that a watcher gets added to the list synchronously 89 // with the response from the server on any request that creates a watch. 90 // In order to not hard code the watch logic for each opcode in the recv 91 // loop the caller can use recvFunc to insert some synchronously code 92 // after a response. 93 recvFunc func(*request, *responseHeader, error) 94} 95 96type response struct { 97 zxid int64 98 err error 99} 100 101type Event struct { 102 Type EventType 103 State State 104 Path string // For non-session events, the path of the watched node. 105 Err error 106 Server string // For connection events 107} 108 109// Connect establishes a new connection to a pool of zookeeper servers 110// using the default net.Dialer. See ConnectWithDialer for further 111// information about session timeout. 112func Connect(servers []string, sessionTimeout time.Duration) (*Conn, <-chan Event, error) { 113 return ConnectWithDialer(servers, sessionTimeout, nil) 114} 115 116// ConnectWithDialer establishes a new connection to a pool of zookeeper 117// servers. The provided session timeout sets the amount of time for which 118// a session is considered valid after losing connection to a server. Within 119// the session timeout it's possible to reestablish a connection to a different 120// server and keep the same session. This is means any ephemeral nodes and 121// watches are maintained. 122func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) { 123 if len(servers) == 0 { 124 return nil, nil, errors.New("zk: server list must not be empty") 125 } 126 127 recvTimeout := sessionTimeout * 2 / 3 128 129 srvs := make([]string, len(servers)) 130 131 for i, addr := range servers { 132 if strings.Contains(addr, ":") { 133 srvs[i] = addr 134 } else { 135 srvs[i] = addr + ":" + strconv.Itoa(DefaultPort) 136 } 137 } 138 139 // Randomize the order of the servers to avoid creating hotspots 140 stringShuffle(srvs) 141 142 ec := make(chan Event, eventChanSize) 143 if dialer == nil { 144 dialer = net.DialTimeout 145 } 146 conn := Conn{ 147 dialer: dialer, 148 servers: srvs, 149 serverIndex: 0, 150 lastServerIndex: -1, 151 conn: nil, 152 state: StateDisconnected, 153 eventChan: ec, 154 shouldQuit: make(chan struct{}), 155 recvTimeout: recvTimeout, 156 pingInterval: recvTimeout / 2, 157 connectTimeout: 1 * time.Second, 158 sendChan: make(chan *request, sendChanSize), 159 requests: make(map[int32]*request), 160 watchers: make(map[watchPathType][]chan Event), 161 passwd: emptyPassword, 162 timeout: int32(sessionTimeout.Nanoseconds() / 1e6), 163 164 // Debug 165 reconnectDelay: 0, 166 } 167 go func() { 168 conn.loop() 169 conn.flushRequests(ErrClosing) 170 conn.invalidateWatches(ErrClosing) 171 close(conn.eventChan) 172 }() 173 return &conn, ec, nil 174} 175 176func (c *Conn) Close() { 177 close(c.shouldQuit) 178 179 select { 180 case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil): 181 case <-time.After(time.Second): 182 } 183} 184 185func (c *Conn) State() State { 186 return State(atomic.LoadInt32((*int32)(&c.state))) 187} 188 189func (c *Conn) setState(state State) { 190 atomic.StoreInt32((*int32)(&c.state), int32(state)) 191 select { 192 case c.eventChan <- Event{Type: EventSession, State: state, Server: c.servers[c.serverIndex]}: 193 default: 194 // panic("zk: event channel full - it must be monitored and never allowed to be full") 195 } 196} 197 198func (c *Conn) connect() error { 199 c.setState(StateConnecting) 200 for { 201 c.serverIndex = (c.serverIndex + 1) % len(c.servers) 202 if c.serverIndex == c.lastServerIndex { 203 c.flushUnsentRequests(ErrNoServer) 204 select { 205 case <-time.After(time.Second): 206 // pass 207 case <-c.shouldQuit: 208 c.setState(StateDisconnected) 209 c.flushUnsentRequests(ErrClosing) 210 return ErrClosing 211 } 212 } else if c.lastServerIndex < 0 { 213 // lastServerIndex defaults to -1 to avoid a delay on the initial connect 214 c.lastServerIndex = 0 215 } 216 217 zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout) 218 if err == nil { 219 c.conn = zkConn 220 c.setState(StateConnected) 221 return nil 222 } 223 224 log.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err) 225 } 226} 227 228func (c *Conn) loop() { 229 for { 230 if err := c.connect(); err != nil { 231 // c.Close() was called 232 return 233 } 234 235 err := c.authenticate() 236 switch { 237 case err == ErrSessionExpired: 238 c.invalidateWatches(err) 239 case err != nil && c.conn != nil: 240 c.conn.Close() 241 case err == nil: 242 c.lastServerIndex = c.serverIndex 243 closeChan := make(chan struct{}) // channel to tell send loop stop 244 var wg sync.WaitGroup 245 246 wg.Add(1) 247 go func() { 248 c.sendLoop(c.conn, closeChan) 249 c.conn.Close() // causes recv loop to EOF/exit 250 wg.Done() 251 }() 252 253 wg.Add(1) 254 go func() { 255 err = c.recvLoop(c.conn) 256 if err == nil { 257 panic("zk: recvLoop should never return nil error") 258 } 259 close(closeChan) // tell send loop to exit 260 wg.Done() 261 }() 262 263 wg.Wait() 264 } 265 266 c.setState(StateDisconnected) 267 268 // Yeesh 269 if err != io.EOF && err != ErrSessionExpired && !strings.Contains(err.Error(), "use of closed network connection") { 270 log.Println(err) 271 } 272 273 select { 274 case <-c.shouldQuit: 275 c.flushRequests(ErrClosing) 276 return 277 default: 278 } 279 280 if err != ErrSessionExpired { 281 err = ErrConnectionClosed 282 } 283 c.flushRequests(err) 284 285 if c.reconnectDelay > 0 { 286 select { 287 case <-c.shouldQuit: 288 return 289 case <-time.After(c.reconnectDelay): 290 } 291 } 292 } 293} 294 295func (c *Conn) flushUnsentRequests(err error) { 296 for { 297 select { 298 default: 299 return 300 case req := <-c.sendChan: 301 req.recvChan <- response{-1, err} 302 } 303 } 304} 305 306// Send error to all pending requests and clear request map 307func (c *Conn) flushRequests(err error) { 308 c.requestsLock.Lock() 309 for _, req := range c.requests { 310 req.recvChan <- response{-1, err} 311 } 312 c.requests = make(map[int32]*request) 313 c.requestsLock.Unlock() 314} 315 316// Send error to all watchers and clear watchers map 317func (c *Conn) invalidateWatches(err error) { 318 c.watchersLock.Lock() 319 defer c.watchersLock.Unlock() 320 321 if len(c.watchers) >= 0 { 322 for pathType, watchers := range c.watchers { 323 ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err} 324 for _, ch := range watchers { 325 ch <- ev 326 close(ch) 327 } 328 } 329 c.watchers = make(map[watchPathType][]chan Event) 330 } 331} 332 333func (c *Conn) sendSetWatches() { 334 c.watchersLock.Lock() 335 defer c.watchersLock.Unlock() 336 337 if len(c.watchers) == 0 { 338 return 339 } 340 341 req := &setWatchesRequest{ 342 RelativeZxid: c.lastZxid, 343 DataWatches: make([]string, 0), 344 ExistWatches: make([]string, 0), 345 ChildWatches: make([]string, 0), 346 } 347 n := 0 348 for pathType, watchers := range c.watchers { 349 if len(watchers) == 0 { 350 continue 351 } 352 switch pathType.wType { 353 case watchTypeData: 354 req.DataWatches = append(req.DataWatches, pathType.path) 355 case watchTypeExist: 356 req.ExistWatches = append(req.ExistWatches, pathType.path) 357 case watchTypeChild: 358 req.ChildWatches = append(req.ChildWatches, pathType.path) 359 } 360 n++ 361 } 362 if n == 0 { 363 return 364 } 365 366 go func() { 367 res := &setWatchesResponse{} 368 _, err := c.request(opSetWatches, req, res, nil) 369 if err != nil { 370 log.Printf("Failed to set previous watches: %s", err.Error()) 371 } 372 }() 373} 374 375func (c *Conn) authenticate() error { 376 buf := make([]byte, 256) 377 378 // connect request 379 380 n, err := encodePacket(buf[4:], &connectRequest{ 381 ProtocolVersion: protocolVersion, 382 LastZxidSeen: c.lastZxid, 383 TimeOut: c.timeout, 384 SessionID: c.sessionID, 385 Passwd: c.passwd, 386 }) 387 if err != nil { 388 return err 389 } 390 391 binary.BigEndian.PutUint32(buf[:4], uint32(n)) 392 393 c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)) 394 _, err = c.conn.Write(buf[:n+4]) 395 c.conn.SetWriteDeadline(time.Time{}) 396 if err != nil { 397 return err 398 } 399 400 c.sendSetWatches() 401 402 // connect response 403 404 // package length 405 c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)) 406 _, err = io.ReadFull(c.conn, buf[:4]) 407 c.conn.SetReadDeadline(time.Time{}) 408 if err != nil { 409 // Sometimes zookeeper just drops connection on invalid session data, 410 // we prefer to drop session and start from scratch when that event 411 // occurs instead of dropping into loop of connect/disconnect attempts 412 c.sessionID = 0 413 c.passwd = emptyPassword 414 c.lastZxid = 0 415 c.setState(StateExpired) 416 return ErrSessionExpired 417 } 418 419 blen := int(binary.BigEndian.Uint32(buf[:4])) 420 if cap(buf) < blen { 421 buf = make([]byte, blen) 422 } 423 424 _, err = io.ReadFull(c.conn, buf[:blen]) 425 if err != nil { 426 return err 427 } 428 429 r := connectResponse{} 430 _, err = decodePacket(buf[:blen], &r) 431 if err != nil { 432 return err 433 } 434 if r.SessionID == 0 { 435 c.sessionID = 0 436 c.passwd = emptyPassword 437 c.lastZxid = 0 438 c.setState(StateExpired) 439 return ErrSessionExpired 440 } 441 442 if c.sessionID != r.SessionID { 443 atomic.StoreUint32(&c.xid, 0) 444 } 445 c.timeout = r.TimeOut 446 c.sessionID = r.SessionID 447 c.passwd = r.Passwd 448 c.setState(StateHasSession) 449 450 return nil 451} 452 453func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error { 454 pingTicker := time.NewTicker(c.pingInterval) 455 defer pingTicker.Stop() 456 457 buf := make([]byte, bufferSize) 458 for { 459 select { 460 case req := <-c.sendChan: 461 header := &requestHeader{req.xid, req.opcode} 462 n, err := encodePacket(buf[4:], header) 463 if err != nil { 464 req.recvChan <- response{-1, err} 465 continue 466 } 467 468 n2, err := encodePacket(buf[4+n:], req.pkt) 469 if err != nil { 470 req.recvChan <- response{-1, err} 471 continue 472 } 473 474 n += n2 475 476 binary.BigEndian.PutUint32(buf[:4], uint32(n)) 477 478 c.requestsLock.Lock() 479 select { 480 case <-closeChan: 481 req.recvChan <- response{-1, ErrConnectionClosed} 482 c.requestsLock.Unlock() 483 return ErrConnectionClosed 484 default: 485 } 486 c.requests[req.xid] = req 487 c.requestsLock.Unlock() 488 489 conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) 490 _, err = conn.Write(buf[:n+4]) 491 conn.SetWriteDeadline(time.Time{}) 492 if err != nil { 493 req.recvChan <- response{-1, err} 494 conn.Close() 495 return err 496 } 497 case <-pingTicker.C: 498 n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing}) 499 if err != nil { 500 panic("zk: opPing should never fail to serialize") 501 } 502 503 binary.BigEndian.PutUint32(buf[:4], uint32(n)) 504 505 conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) 506 _, err = conn.Write(buf[:n+4]) 507 conn.SetWriteDeadline(time.Time{}) 508 if err != nil { 509 conn.Close() 510 return err 511 } 512 case <-closeChan: 513 return nil 514 } 515 } 516} 517 518func (c *Conn) recvLoop(conn net.Conn) error { 519 buf := make([]byte, bufferSize) 520 for { 521 // package length 522 conn.SetReadDeadline(time.Now().Add(c.recvTimeout)) 523 _, err := io.ReadFull(conn, buf[:4]) 524 if err != nil { 525 return err 526 } 527 528 blen := int(binary.BigEndian.Uint32(buf[:4])) 529 if cap(buf) < blen { 530 buf = make([]byte, blen) 531 } 532 533 _, err = io.ReadFull(conn, buf[:blen]) 534 conn.SetReadDeadline(time.Time{}) 535 if err != nil { 536 return err 537 } 538 539 res := responseHeader{} 540 _, err = decodePacket(buf[:16], &res) 541 if err != nil { 542 return err 543 } 544 545 if res.Xid == -1 { 546 res := &watcherEvent{} 547 _, err := decodePacket(buf[16:16+blen], res) 548 if err != nil { 549 return err 550 } 551 ev := Event{ 552 Type: res.Type, 553 State: res.State, 554 Path: res.Path, 555 Err: nil, 556 } 557 select { 558 case c.eventChan <- ev: 559 default: 560 } 561 wTypes := make([]watchType, 0, 2) 562 switch res.Type { 563 case EventNodeCreated: 564 wTypes = append(wTypes, watchTypeExist) 565 case EventNodeDeleted, EventNodeDataChanged: 566 wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild) 567 case EventNodeChildrenChanged: 568 wTypes = append(wTypes, watchTypeChild) 569 } 570 c.watchersLock.Lock() 571 for _, t := range wTypes { 572 wpt := watchPathType{res.Path, t} 573 if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 { 574 for _, ch := range watchers { 575 ch <- ev 576 close(ch) 577 } 578 delete(c.watchers, wpt) 579 } 580 } 581 c.watchersLock.Unlock() 582 } else if res.Xid == -2 { 583 // Ping response. Ignore. 584 } else if res.Xid < 0 { 585 log.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid) 586 } else { 587 if res.Zxid > 0 { 588 c.lastZxid = res.Zxid 589 } 590 591 c.requestsLock.Lock() 592 req, ok := c.requests[res.Xid] 593 if ok { 594 delete(c.requests, res.Xid) 595 } 596 c.requestsLock.Unlock() 597 598 if !ok { 599 log.Printf("Response for unknown request with xid %d", res.Xid) 600 } else { 601 if res.Err != 0 { 602 err = res.Err.toError() 603 } else { 604 _, err = decodePacket(buf[16:16+blen], req.recvStruct) 605 } 606 if req.recvFunc != nil { 607 req.recvFunc(req, &res, err) 608 } 609 req.recvChan <- response{res.Zxid, err} 610 if req.opcode == opClose { 611 return io.EOF 612 } 613 } 614 } 615 } 616} 617 618func (c *Conn) nextXid() int32 { 619 return int32(atomic.AddUint32(&c.xid, 1) & 0x7fffffff) 620} 621 622func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event { 623 c.watchersLock.Lock() 624 defer c.watchersLock.Unlock() 625 626 ch := make(chan Event, 1) 627 wpt := watchPathType{path, watchType} 628 c.watchers[wpt] = append(c.watchers[wpt], ch) 629 return ch 630} 631 632func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response { 633 rq := &request{ 634 xid: c.nextXid(), 635 opcode: opcode, 636 pkt: req, 637 recvStruct: res, 638 recvChan: make(chan response, 1), 639 recvFunc: recvFunc, 640 } 641 c.sendChan <- rq 642 return rq.recvChan 643} 644 645func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) { 646 r := <-c.queueRequest(opcode, req, res, recvFunc) 647 return r.zxid, r.err 648} 649 650func (c *Conn) AddAuth(scheme string, auth []byte) error { 651 _, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil) 652 return err 653} 654 655func (c *Conn) Children(path string) ([]string, *Stat, error) { 656 res := &getChildren2Response{} 657 _, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil) 658 return res.Children, &res.Stat, err 659} 660 661func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) { 662 var ech <-chan Event 663 res := &getChildren2Response{} 664 _, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) { 665 if err == nil { 666 ech = c.addWatcher(path, watchTypeChild) 667 } 668 }) 669 if err != nil { 670 return nil, nil, nil, err 671 } 672 return res.Children, &res.Stat, ech, err 673} 674 675func (c *Conn) Get(path string) ([]byte, *Stat, error) { 676 res := &getDataResponse{} 677 _, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil) 678 return res.Data, &res.Stat, err 679} 680 681// GetW returns the contents of a znode and sets a watch 682func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) { 683 var ech <-chan Event 684 res := &getDataResponse{} 685 _, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) { 686 if err == nil { 687 ech = c.addWatcher(path, watchTypeData) 688 } 689 }) 690 if err != nil { 691 return nil, nil, nil, err 692 } 693 return res.Data, &res.Stat, ech, err 694} 695 696func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) { 697 res := &setDataResponse{} 698 _, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil) 699 return &res.Stat, err 700} 701 702func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) { 703 res := &createResponse{} 704 _, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil) 705 return res.Path, err 706} 707 708// CreateProtectedEphemeralSequential fixes a race condition if the server crashes 709// after it creates the node. On reconnect the session may still be valid so the 710// ephemeral node still exists. Therefore, on reconnect we need to check if a node 711// with a GUID generated on create exists. 712func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) { 713 var guid [16]byte 714 _, err := io.ReadFull(rand.Reader, guid[:16]) 715 if err != nil { 716 return "", err 717 } 718 guidStr := fmt.Sprintf("%x", guid) 719 720 parts := strings.Split(path, "/") 721 parts[len(parts)-1] = fmt.Sprintf("%s%s-%s", protectedPrefix, guidStr, parts[len(parts)-1]) 722 rootPath := strings.Join(parts[:len(parts)-1], "/") 723 protectedPath := strings.Join(parts, "/") 724 725 var newPath string 726 for i := 0; i < 3; i++ { 727 newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl) 728 switch err { 729 case ErrSessionExpired: 730 // No need to search for the node since it can't exist. Just try again. 731 case ErrConnectionClosed: 732 children, _, err := c.Children(rootPath) 733 if err != nil { 734 return "", err 735 } 736 for _, p := range children { 737 parts := strings.Split(p, "/") 738 if pth := parts[len(parts)-1]; strings.HasPrefix(pth, protectedPrefix) { 739 if g := pth[len(protectedPrefix) : len(protectedPrefix)+32]; g == guidStr { 740 return rootPath + "/" + p, nil 741 } 742 } 743 } 744 case nil: 745 return newPath, nil 746 default: 747 return "", err 748 } 749 } 750 return "", err 751} 752 753func (c *Conn) Delete(path string, version int32) error { 754 _, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil) 755 return err 756} 757 758func (c *Conn) Exists(path string) (bool, *Stat, error) { 759 res := &existsResponse{} 760 _, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil) 761 exists := true 762 if err == ErrNoNode { 763 exists = false 764 err = nil 765 } 766 return exists, &res.Stat, err 767} 768 769func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) { 770 var ech <-chan Event 771 res := &existsResponse{} 772 _, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) { 773 if err == nil { 774 ech = c.addWatcher(path, watchTypeData) 775 } else if err == ErrNoNode { 776 ech = c.addWatcher(path, watchTypeExist) 777 } 778 }) 779 exists := true 780 if err == ErrNoNode { 781 exists = false 782 err = nil 783 } 784 if err != nil { 785 return false, nil, nil, err 786 } 787 return exists, &res.Stat, ech, err 788} 789 790func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) { 791 res := &getAclResponse{} 792 _, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil) 793 return res.Acl, &res.Stat, err 794} 795 796func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) { 797 res := &setAclResponse{} 798 _, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil) 799 return &res.Stat, err 800} 801 802func (c *Conn) Sync(path string) (string, error) { 803 res := &syncResponse{} 804 _, err := c.request(opSync, &syncRequest{Path: path}, res, nil) 805 return res.Path, err 806} 807 808type MultiResponse struct { 809 Stat *Stat 810 String string 811} 812 813// Multi executes multiple ZooKeeper operations or none of them. The provided 814// ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or 815// *CheckVersionRequest. 816func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) { 817 req := &multiRequest{ 818 Ops: make([]multiRequestOp, 0, len(ops)), 819 DoneHeader: multiHeader{Type: -1, Done: true, Err: -1}, 820 } 821 for _, op := range ops { 822 var opCode int32 823 switch op.(type) { 824 case *CreateRequest: 825 opCode = opCreate 826 case *SetDataRequest: 827 opCode = opSetData 828 case *DeleteRequest: 829 opCode = opDelete 830 case *CheckVersionRequest: 831 opCode = opCheck 832 default: 833 return nil, fmt.Errorf("uknown operation type %T", op) 834 } 835 req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCode, false, -1}, op}) 836 } 837 res := &multiResponse{} 838 _, err := c.request(opMulti, req, res, nil) 839 mr := make([]MultiResponse, len(res.Ops)) 840 for i, op := range res.Ops { 841 mr[i] = MultiResponse{Stat: op.Stat, String: op.String} 842 } 843 return mr, err 844} 845