1package api 2 3import ( 4 "encoding/json" 5 "errors" 6 "io" 7 "io/ioutil" 8 "strconv" 9 "strings" 10 "time" 11) 12 13// Operator can be used to perform low-level operator tasks for Nomad. 14type Operator struct { 15 c *Client 16} 17 18// Operator returns a handle to the operator endpoints. 19func (c *Client) Operator() *Operator { 20 return &Operator{c} 21} 22 23// RaftServer has information about a server in the Raft configuration. 24type RaftServer struct { 25 // ID is the unique ID for the server. These are currently the same 26 // as the address, but they will be changed to a real GUID in a future 27 // release of Nomad. 28 ID string 29 30 // Node is the node name of the server, as known by Nomad, or this 31 // will be set to "(unknown)" otherwise. 32 Node string 33 34 // Address is the IP:port of the server, used for Raft communications. 35 Address string 36 37 // Leader is true if this server is the current cluster leader. 38 Leader bool 39 40 // Voter is true if this server has a vote in the cluster. This might 41 // be false if the server is staging and still coming online, or if 42 // it's a non-voting server, which will be added in a future release of 43 // Nomad. 44 Voter bool 45 46 // RaftProtocol is the version of the Raft protocol spoken by this server. 47 RaftProtocol string 48} 49 50// RaftConfiguration is returned when querying for the current Raft configuration. 51type RaftConfiguration struct { 52 // Servers has the list of servers in the Raft configuration. 53 Servers []*RaftServer 54 55 // Index has the Raft index of this configuration. 56 Index uint64 57} 58 59// RaftGetConfiguration is used to query the current Raft peer set. 60func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { 61 r, err := op.c.newRequest("GET", "/v1/operator/raft/configuration") 62 if err != nil { 63 return nil, err 64 } 65 r.setQueryOptions(q) 66 _, resp, err := requireOK(op.c.doRequest(r)) 67 if err != nil { 68 return nil, err 69 } 70 defer resp.Body.Close() 71 72 var out RaftConfiguration 73 if err := decodeBody(resp, &out); err != nil { 74 return nil, err 75 } 76 return &out, nil 77} 78 79// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft 80// quorum but no longer known to Serf or the catalog) by address in the form of 81// "IP:port". 82func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error { 83 r, err := op.c.newRequest("DELETE", "/v1/operator/raft/peer") 84 if err != nil { 85 return err 86 } 87 r.setWriteOptions(q) 88 89 r.params.Set("address", address) 90 91 _, resp, err := requireOK(op.c.doRequest(r)) 92 if err != nil { 93 return err 94 } 95 96 resp.Body.Close() 97 return nil 98} 99 100// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft 101// quorum but no longer known to Serf or the catalog) by ID. 102func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error { 103 r, err := op.c.newRequest("DELETE", "/v1/operator/raft/peer") 104 if err != nil { 105 return err 106 } 107 r.setWriteOptions(q) 108 109 r.params.Set("id", id) 110 111 _, resp, err := requireOK(op.c.doRequest(r)) 112 if err != nil { 113 return err 114 } 115 116 resp.Body.Close() 117 return nil 118} 119 120// SchedulerConfiguration is the config for controlling scheduler behavior 121type SchedulerConfiguration struct { 122 // SchedulerAlgorithm lets you select between available scheduling algorithms. 123 SchedulerAlgorithm SchedulerAlgorithm 124 125 // PreemptionConfig specifies whether to enable eviction of lower 126 // priority jobs to place higher priority jobs. 127 PreemptionConfig PreemptionConfig 128 129 // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled 130 MemoryOversubscriptionEnabled bool 131 132 // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. 133 CreateIndex uint64 134 ModifyIndex uint64 135} 136 137// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration 138type SchedulerConfigurationResponse struct { 139 // SchedulerConfig contains scheduler config options 140 SchedulerConfig *SchedulerConfiguration 141 142 QueryMeta 143} 144 145// SchedulerSetConfigurationResponse is the response object used 146// when updating scheduler configuration 147type SchedulerSetConfigurationResponse struct { 148 // Updated returns whether the config was actually updated 149 // Only set when the request uses CAS 150 Updated bool 151 152 WriteMeta 153} 154 155// SchedulerAlgorithm is an enum string that encapsulates the valid options for a 156// SchedulerConfiguration stanza's SchedulerAlgorithm. These modes will allow the 157// scheduler to be user-selectable. 158type SchedulerAlgorithm string 159 160const ( 161 SchedulerAlgorithmBinpack SchedulerAlgorithm = "binpack" 162 SchedulerAlgorithmSpread SchedulerAlgorithm = "spread" 163) 164 165// PreemptionConfig specifies whether preemption is enabled based on scheduler type 166type PreemptionConfig struct { 167 SystemSchedulerEnabled bool 168 BatchSchedulerEnabled bool 169 ServiceSchedulerEnabled bool 170} 171 172// SchedulerGetConfiguration is used to query the current Scheduler configuration. 173func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) { 174 var resp SchedulerConfigurationResponse 175 qm, err := op.c.query("/v1/operator/scheduler/configuration", &resp, q) 176 if err != nil { 177 return nil, nil, err 178 } 179 return &resp, qm, nil 180} 181 182// SchedulerSetConfiguration is used to set the current Scheduler configuration. 183func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) { 184 var out SchedulerSetConfigurationResponse 185 wm, err := op.c.write("/v1/operator/scheduler/configuration", conf, &out, q) 186 if err != nil { 187 return nil, nil, err 188 } 189 return &out, wm, nil 190} 191 192// SchedulerCASConfiguration is used to perform a Check-And-Set update on the 193// Scheduler configuration. The ModifyIndex value will be respected. Returns 194// true on success or false on failures. 195func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) { 196 var out SchedulerSetConfigurationResponse 197 wm, err := op.c.write("/v1/operator/scheduler/configuration?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q) 198 if err != nil { 199 return nil, nil, err 200 } 201 202 return &out, wm, nil 203} 204 205// Snapshot is used to capture a snapshot state of a running cluster. 206// The returned reader that must be consumed fully 207func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) { 208 r, err := op.c.newRequest("GET", "/v1/operator/snapshot") 209 if err != nil { 210 return nil, err 211 } 212 r.setQueryOptions(q) 213 _, resp, err := requireOK(op.c.doRequest(r)) 214 if err != nil { 215 return nil, err 216 } 217 218 digest := resp.Header.Get("Digest") 219 220 cr, err := newChecksumValidatingReader(resp.Body, digest) 221 if err != nil { 222 io.Copy(ioutil.Discard, resp.Body) 223 resp.Body.Close() 224 225 return nil, err 226 } 227 228 return cr, nil 229} 230 231// SnapshotRestore is used to restore a running nomad cluster to an original 232// state. 233func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) { 234 wm, err := op.c.write("/v1/operator/snapshot", in, nil, q) 235 if err != nil { 236 return nil, err 237 } 238 239 return wm, nil 240} 241 242type License struct { 243 // The unique identifier of the license 244 LicenseID string 245 246 // The customer ID associated with the license 247 CustomerID string 248 249 // If set, an identifier that should be used to lock the license to a 250 // particular site, cluster, etc. 251 InstallationID string 252 253 // The time at which the license was issued 254 IssueTime time.Time 255 256 // The time at which the license starts being valid 257 StartTime time.Time 258 259 // The time after which the license expires 260 ExpirationTime time.Time 261 262 // The time at which the license ceases to function and can 263 // no longer be used in any capacity 264 TerminationTime time.Time 265 266 // The product the license is valid for 267 Product string 268 269 // License Specific Flags 270 Flags map[string]interface{} 271 272 // Modules is a list of the licensed enterprise modules 273 Modules []string 274 275 // List of features enabled by the license 276 Features []string 277} 278 279type LicenseReply struct { 280 License *License 281 ConfigOutdated bool 282 QueryMeta 283} 284 285type ApplyLicenseOptions struct { 286 Force bool 287} 288 289func (op *Operator) LicensePut(license string, q *WriteOptions) (*WriteMeta, error) { 290 return op.ApplyLicense(license, nil, q) 291} 292 293func (op *Operator) ApplyLicense(license string, opts *ApplyLicenseOptions, q *WriteOptions) (*WriteMeta, error) { 294 r, err := op.c.newRequest("PUT", "/v1/operator/license") 295 if err != nil { 296 return nil, err 297 } 298 299 if opts != nil && opts.Force { 300 r.params.Add("force", "true") 301 } 302 303 r.setWriteOptions(q) 304 r.body = strings.NewReader(license) 305 306 rtt, resp, err := requireOK(op.c.doRequest(r)) 307 if err != nil { 308 return nil, err 309 } 310 defer resp.Body.Close() 311 312 wm := &WriteMeta{RequestTime: rtt} 313 parseWriteMeta(resp, wm) 314 315 return wm, nil 316} 317 318func (op *Operator) LicenseGet(q *QueryOptions) (*LicenseReply, *QueryMeta, error) { 319 req, err := op.c.newRequest("GET", "/v1/operator/license") 320 if err != nil { 321 return nil, nil, err 322 } 323 req.setQueryOptions(q) 324 325 var reply LicenseReply 326 rtt, resp, err := op.c.doRequest(req) 327 if err != nil { 328 return nil, nil, err 329 } 330 defer resp.Body.Close() 331 332 if resp.StatusCode == 204 { 333 return nil, nil, errors.New("Nomad Enterprise only endpoint") 334 } 335 336 err = json.NewDecoder(resp.Body).Decode(&reply) 337 if err != nil { 338 return nil, nil, err 339 } 340 341 qm := &QueryMeta{} 342 parseQueryMeta(resp, qm) 343 qm.RequestTime = rtt 344 345 return &reply, qm, nil 346} 347