1package api 2 3import ( 4 "errors" 5 "fmt" 6 "time" 7) 8 9const ( 10 // SessionBehaviorRelease is the default behavior and causes 11 // all associated locks to be released on session invalidation. 12 SessionBehaviorRelease = "release" 13 14 // SessionBehaviorDelete is new in Consul 0.5 and changes the 15 // behavior to delete all associated locks on session invalidation. 16 // It can be used in a way similar to Ephemeral Nodes in ZooKeeper. 17 SessionBehaviorDelete = "delete" 18) 19 20var ErrSessionExpired = errors.New("session expired") 21 22// SessionEntry represents a session in consul 23type SessionEntry struct { 24 CreateIndex uint64 25 ID string 26 Name string 27 Node string 28 LockDelay time.Duration 29 Behavior string 30 TTL string 31 Namespace string `json:",omitempty"` 32 33 // Deprecated for Consul Enterprise in v1.7.0. 34 Checks []string 35 36 // NodeChecks and ServiceChecks are new in Consul 1.7.0. 37 // When associating checks with sessions, namespaces can be specified for service checks. 38 NodeChecks []string 39 ServiceChecks []ServiceCheck 40} 41 42type ServiceCheck struct { 43 ID string 44 Namespace string 45} 46 47// Session can be used to query the Session endpoints 48type Session struct { 49 c *Client 50} 51 52// Session returns a handle to the session endpoints 53func (c *Client) Session() *Session { 54 return &Session{c} 55} 56 57// CreateNoChecks is like Create but is used specifically to create 58// a session with no associated health checks. 59func (s *Session) CreateNoChecks(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) { 60 body := make(map[string]interface{}) 61 body["NodeChecks"] = []string{} 62 if se != nil { 63 if se.Name != "" { 64 body["Name"] = se.Name 65 } 66 if se.Node != "" { 67 body["Node"] = se.Node 68 } 69 if se.LockDelay != 0 { 70 body["LockDelay"] = durToMsec(se.LockDelay) 71 } 72 if se.Behavior != "" { 73 body["Behavior"] = se.Behavior 74 } 75 if se.TTL != "" { 76 body["TTL"] = se.TTL 77 } 78 } 79 return s.create(body, q) 80 81} 82 83// Create makes a new session. Providing a session entry can 84// customize the session. It can also be nil to use defaults. 85func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) { 86 var obj interface{} 87 if se != nil { 88 body := make(map[string]interface{}) 89 obj = body 90 if se.Name != "" { 91 body["Name"] = se.Name 92 } 93 if se.Node != "" { 94 body["Node"] = se.Node 95 } 96 if se.LockDelay != 0 { 97 body["LockDelay"] = durToMsec(se.LockDelay) 98 } 99 if len(se.Checks) > 0 { 100 body["Checks"] = se.Checks 101 } 102 if len(se.NodeChecks) > 0 { 103 body["NodeChecks"] = se.NodeChecks 104 } 105 if len(se.ServiceChecks) > 0 { 106 body["ServiceChecks"] = se.ServiceChecks 107 } 108 if se.Behavior != "" { 109 body["Behavior"] = se.Behavior 110 } 111 if se.TTL != "" { 112 body["TTL"] = se.TTL 113 } 114 } 115 return s.create(obj, q) 116} 117 118func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, error) { 119 var out struct{ ID string } 120 wm, err := s.c.write("/v1/session/create", obj, &out, q) 121 if err != nil { 122 return "", nil, err 123 } 124 return out.ID, wm, nil 125} 126 127// Destroy invalidates a given session 128func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) { 129 wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q) 130 if err != nil { 131 return nil, err 132 } 133 return wm, nil 134} 135 136// Renew renews the TTL on a given session 137func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) { 138 r := s.c.newRequest("PUT", "/v1/session/renew/"+id) 139 r.setWriteOptions(q) 140 rtt, resp, err := s.c.doRequest(r) 141 if err != nil { 142 return nil, nil, err 143 } 144 defer closeResponseBody(resp) 145 146 wm := &WriteMeta{RequestTime: rtt} 147 148 if resp.StatusCode == 404 { 149 return nil, wm, nil 150 } else if resp.StatusCode != 200 { 151 return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode) 152 } 153 154 var entries []*SessionEntry 155 if err := decodeBody(resp, &entries); err != nil { 156 return nil, nil, fmt.Errorf("Failed to read response: %v", err) 157 } 158 if len(entries) > 0 { 159 return entries[0], wm, nil 160 } 161 return nil, wm, nil 162} 163 164// RenewPeriodic is used to periodically invoke Session.Renew on a 165// session until a doneCh is closed. This is meant to be used in a long running 166// goroutine to ensure a session stays valid. 167func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh <-chan struct{}) error { 168 ctx := q.Context() 169 170 ttl, err := time.ParseDuration(initialTTL) 171 if err != nil { 172 return err 173 } 174 175 waitDur := ttl / 2 176 lastRenewTime := time.Now() 177 var lastErr error 178 for { 179 if time.Since(lastRenewTime) > ttl { 180 return lastErr 181 } 182 select { 183 case <-time.After(waitDur): 184 entry, _, err := s.Renew(id, q) 185 if err != nil { 186 waitDur = time.Second 187 lastErr = err 188 continue 189 } 190 if entry == nil { 191 return ErrSessionExpired 192 } 193 194 // Handle the server updating the TTL 195 ttl, _ = time.ParseDuration(entry.TTL) 196 waitDur = ttl / 2 197 lastRenewTime = time.Now() 198 199 case <-doneCh: 200 // Attempt a session destroy 201 s.Destroy(id, q) 202 return nil 203 204 case <-ctx.Done(): 205 // Bail immediately since attempting the destroy would 206 // use the canceled context in q, which would just bail. 207 return ctx.Err() 208 } 209 } 210} 211 212// Info looks up a single session 213func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) { 214 var entries []*SessionEntry 215 qm, err := s.c.query("/v1/session/info/"+id, &entries, q) 216 if err != nil { 217 return nil, nil, err 218 } 219 if len(entries) > 0 { 220 return entries[0], qm, nil 221 } 222 return nil, qm, nil 223} 224 225// List gets sessions for a node 226func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) { 227 var entries []*SessionEntry 228 qm, err := s.c.query("/v1/session/node/"+node, &entries, q) 229 if err != nil { 230 return nil, nil, err 231 } 232 return entries, qm, nil 233} 234 235// List gets all active sessions 236func (s *Session) List(q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) { 237 var entries []*SessionEntry 238 qm, err := s.c.query("/v1/session/list", &entries, q) 239 if err != nil { 240 return nil, nil, err 241 } 242 return entries, qm, nil 243} 244