1package hcs 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "strings" 8 "sync" 9 "syscall" 10 11 "github.com/Microsoft/hcsshim/internal/cow" 12 "github.com/Microsoft/hcsshim/internal/hcs/schema1" 13 hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" 14 "github.com/Microsoft/hcsshim/internal/log" 15 "github.com/Microsoft/hcsshim/internal/oc" 16 "github.com/Microsoft/hcsshim/internal/timeout" 17 "github.com/Microsoft/hcsshim/internal/vmcompute" 18 "go.opencensus.io/trace" 19) 20 21type System struct { 22 handleLock sync.RWMutex 23 handle vmcompute.HcsSystem 24 id string 25 callbackNumber uintptr 26 27 closedWaitOnce sync.Once 28 waitBlock chan struct{} 29 waitError error 30 exitError error 31 os, typ string 32} 33 34func newSystem(id string) *System { 35 return &System{ 36 id: id, 37 waitBlock: make(chan struct{}), 38 } 39} 40 41// CreateComputeSystem creates a new compute system with the given configuration but does not start it. 42func CreateComputeSystem(ctx context.Context, id string, hcsDocumentInterface interface{}) (_ *System, err error) { 43 operation := "hcs::CreateComputeSystem" 44 45 // hcsCreateComputeSystemContext is an async operation. Start the outer span 46 // here to measure the full create time. 47 ctx, span := trace.StartSpan(ctx, operation) 48 defer span.End() 49 defer func() { oc.SetSpanStatus(span, err) }() 50 span.AddAttributes(trace.StringAttribute("cid", id)) 51 52 computeSystem := newSystem(id) 53 54 hcsDocumentB, err := json.Marshal(hcsDocumentInterface) 55 if err != nil { 56 return nil, err 57 } 58 59 hcsDocument := string(hcsDocumentB) 60 61 var ( 62 identity syscall.Handle 63 resultJSON string 64 createError error 65 ) 66 computeSystem.handle, resultJSON, createError = vmcompute.HcsCreateComputeSystem(ctx, id, hcsDocument, identity) 67 if createError == nil || IsPending(createError) { 68 defer func() { 69 if err != nil { 70 computeSystem.Close() 71 } 72 }() 73 if err = computeSystem.registerCallback(ctx); err != nil { 74 // Terminate the compute system if it still exists. We're okay to 75 // ignore a failure here. 76 _ = computeSystem.Terminate(ctx) 77 return nil, makeSystemError(computeSystem, operation, err, nil) 78 } 79 } 80 81 events, err := processAsyncHcsResult(ctx, createError, resultJSON, computeSystem.callbackNumber, hcsNotificationSystemCreateCompleted, &timeout.SystemCreate) 82 if err != nil { 83 if err == ErrTimeout { 84 // Terminate the compute system if it still exists. We're okay to 85 // ignore a failure here. 86 _ = computeSystem.Terminate(ctx) 87 } 88 return nil, makeSystemError(computeSystem, operation, err, events) 89 } 90 go computeSystem.waitBackground() 91 if err = computeSystem.getCachedProperties(ctx); err != nil { 92 return nil, err 93 } 94 return computeSystem, nil 95} 96 97// OpenComputeSystem opens an existing compute system by ID. 98func OpenComputeSystem(ctx context.Context, id string) (*System, error) { 99 operation := "hcs::OpenComputeSystem" 100 101 computeSystem := newSystem(id) 102 handle, resultJSON, err := vmcompute.HcsOpenComputeSystem(ctx, id) 103 events := processHcsResult(ctx, resultJSON) 104 if err != nil { 105 return nil, makeSystemError(computeSystem, operation, err, events) 106 } 107 computeSystem.handle = handle 108 defer func() { 109 if err != nil { 110 computeSystem.Close() 111 } 112 }() 113 if err = computeSystem.registerCallback(ctx); err != nil { 114 return nil, makeSystemError(computeSystem, operation, err, nil) 115 } 116 go computeSystem.waitBackground() 117 if err = computeSystem.getCachedProperties(ctx); err != nil { 118 return nil, err 119 } 120 return computeSystem, nil 121} 122 123func (computeSystem *System) getCachedProperties(ctx context.Context) error { 124 props, err := computeSystem.Properties(ctx) 125 if err != nil { 126 return err 127 } 128 computeSystem.typ = strings.ToLower(props.SystemType) 129 computeSystem.os = strings.ToLower(props.RuntimeOSType) 130 if computeSystem.os == "" && computeSystem.typ == "container" { 131 // Pre-RS5 HCS did not return the OS, but it only supported containers 132 // that ran Windows. 133 computeSystem.os = "windows" 134 } 135 return nil 136} 137 138// OS returns the operating system of the compute system, "linux" or "windows". 139func (computeSystem *System) OS() string { 140 return computeSystem.os 141} 142 143// IsOCI returns whether processes in the compute system should be created via 144// OCI. 145func (computeSystem *System) IsOCI() bool { 146 return computeSystem.os == "linux" && computeSystem.typ == "container" 147} 148 149// GetComputeSystems gets a list of the compute systems on the system that match the query 150func GetComputeSystems(ctx context.Context, q schema1.ComputeSystemQuery) ([]schema1.ContainerProperties, error) { 151 operation := "hcs::GetComputeSystems" 152 153 queryb, err := json.Marshal(q) 154 if err != nil { 155 return nil, err 156 } 157 158 computeSystemsJSON, resultJSON, err := vmcompute.HcsEnumerateComputeSystems(ctx, string(queryb)) 159 events := processHcsResult(ctx, resultJSON) 160 if err != nil { 161 return nil, &HcsError{Op: operation, Err: err, Events: events} 162 } 163 164 if computeSystemsJSON == "" { 165 return nil, ErrUnexpectedValue 166 } 167 computeSystems := []schema1.ContainerProperties{} 168 if err = json.Unmarshal([]byte(computeSystemsJSON), &computeSystems); err != nil { 169 return nil, err 170 } 171 172 return computeSystems, nil 173} 174 175// Start synchronously starts the computeSystem. 176func (computeSystem *System) Start(ctx context.Context) (err error) { 177 operation := "hcs::System::Start" 178 179 // hcsStartComputeSystemContext is an async operation. Start the outer span 180 // here to measure the full start time. 181 ctx, span := trace.StartSpan(ctx, operation) 182 defer span.End() 183 defer func() { oc.SetSpanStatus(span, err) }() 184 span.AddAttributes(trace.StringAttribute("cid", computeSystem.id)) 185 186 computeSystem.handleLock.RLock() 187 defer computeSystem.handleLock.RUnlock() 188 189 if computeSystem.handle == 0 { 190 return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 191 } 192 193 resultJSON, err := vmcompute.HcsStartComputeSystem(ctx, computeSystem.handle, "") 194 events, err := processAsyncHcsResult(ctx, err, resultJSON, computeSystem.callbackNumber, hcsNotificationSystemStartCompleted, &timeout.SystemStart) 195 if err != nil { 196 return makeSystemError(computeSystem, operation, err, events) 197 } 198 199 return nil 200} 201 202// ID returns the compute system's identifier. 203func (computeSystem *System) ID() string { 204 return computeSystem.id 205} 206 207// Shutdown requests a compute system shutdown. 208func (computeSystem *System) Shutdown(ctx context.Context) error { 209 computeSystem.handleLock.RLock() 210 defer computeSystem.handleLock.RUnlock() 211 212 operation := "hcs::System::Shutdown" 213 214 if computeSystem.handle == 0 { 215 return nil 216 } 217 218 resultJSON, err := vmcompute.HcsShutdownComputeSystem(ctx, computeSystem.handle, "") 219 events := processHcsResult(ctx, resultJSON) 220 switch err { 221 case nil, ErrVmcomputeAlreadyStopped, ErrComputeSystemDoesNotExist, ErrVmcomputeOperationPending: 222 default: 223 return makeSystemError(computeSystem, operation, err, events) 224 } 225 return nil 226} 227 228// Terminate requests a compute system terminate. 229func (computeSystem *System) Terminate(ctx context.Context) error { 230 computeSystem.handleLock.RLock() 231 defer computeSystem.handleLock.RUnlock() 232 233 operation := "hcs::System::Terminate" 234 235 if computeSystem.handle == 0 { 236 return nil 237 } 238 239 resultJSON, err := vmcompute.HcsTerminateComputeSystem(ctx, computeSystem.handle, "") 240 events := processHcsResult(ctx, resultJSON) 241 switch err { 242 case nil, ErrVmcomputeAlreadyStopped, ErrComputeSystemDoesNotExist, ErrVmcomputeOperationPending: 243 default: 244 return makeSystemError(computeSystem, operation, err, events) 245 } 246 return nil 247} 248 249// waitBackground waits for the compute system exit notification. Once received 250// sets `computeSystem.waitError` (if any) and unblocks all `Wait` calls. 251// 252// This MUST be called exactly once per `computeSystem.handle` but `Wait` is 253// safe to call multiple times. 254func (computeSystem *System) waitBackground() { 255 operation := "hcs::System::waitBackground" 256 ctx, span := trace.StartSpan(context.Background(), operation) 257 defer span.End() 258 span.AddAttributes(trace.StringAttribute("cid", computeSystem.id)) 259 260 err := waitForNotification(ctx, computeSystem.callbackNumber, hcsNotificationSystemExited, nil) 261 switch err { 262 case nil: 263 log.G(ctx).Debug("system exited") 264 case ErrVmcomputeUnexpectedExit: 265 log.G(ctx).Debug("unexpected system exit") 266 computeSystem.exitError = makeSystemError(computeSystem, operation, err, nil) 267 err = nil 268 default: 269 err = makeSystemError(computeSystem, operation, err, nil) 270 } 271 computeSystem.closedWaitOnce.Do(func() { 272 computeSystem.waitError = err 273 close(computeSystem.waitBlock) 274 }) 275 oc.SetSpanStatus(span, err) 276} 277 278// Wait synchronously waits for the compute system to shutdown or terminate. If 279// the compute system has already exited returns the previous error (if any). 280func (computeSystem *System) Wait() error { 281 <-computeSystem.waitBlock 282 return computeSystem.waitError 283} 284 285// ExitError returns an error describing the reason the compute system terminated. 286func (computeSystem *System) ExitError() error { 287 select { 288 case <-computeSystem.waitBlock: 289 if computeSystem.waitError != nil { 290 return computeSystem.waitError 291 } 292 return computeSystem.exitError 293 default: 294 return errors.New("container not exited") 295 } 296} 297 298// Properties returns the requested container properties targeting a V1 schema container. 299func (computeSystem *System) Properties(ctx context.Context, types ...schema1.PropertyType) (*schema1.ContainerProperties, error) { 300 computeSystem.handleLock.RLock() 301 defer computeSystem.handleLock.RUnlock() 302 303 operation := "hcs::System::Properties" 304 305 queryBytes, err := json.Marshal(schema1.PropertyQuery{PropertyTypes: types}) 306 if err != nil { 307 return nil, makeSystemError(computeSystem, operation, err, nil) 308 } 309 310 propertiesJSON, resultJSON, err := vmcompute.HcsGetComputeSystemProperties(ctx, computeSystem.handle, string(queryBytes)) 311 events := processHcsResult(ctx, resultJSON) 312 if err != nil { 313 return nil, makeSystemError(computeSystem, operation, err, events) 314 } 315 316 if propertiesJSON == "" { 317 return nil, ErrUnexpectedValue 318 } 319 properties := &schema1.ContainerProperties{} 320 if err := json.Unmarshal([]byte(propertiesJSON), properties); err != nil { 321 return nil, makeSystemError(computeSystem, operation, err, nil) 322 } 323 324 return properties, nil 325} 326 327// PropertiesV2 returns the requested container properties targeting a V2 schema container. 328func (computeSystem *System) PropertiesV2(ctx context.Context, types ...hcsschema.PropertyType) (*hcsschema.Properties, error) { 329 computeSystem.handleLock.RLock() 330 defer computeSystem.handleLock.RUnlock() 331 332 operation := "hcs::System::PropertiesV2" 333 334 queryBytes, err := json.Marshal(hcsschema.PropertyQuery{PropertyTypes: types}) 335 if err != nil { 336 return nil, makeSystemError(computeSystem, operation, err, nil) 337 } 338 339 propertiesJSON, resultJSON, err := vmcompute.HcsGetComputeSystemProperties(ctx, computeSystem.handle, string(queryBytes)) 340 events := processHcsResult(ctx, resultJSON) 341 if err != nil { 342 return nil, makeSystemError(computeSystem, operation, err, events) 343 } 344 345 if propertiesJSON == "" { 346 return nil, ErrUnexpectedValue 347 } 348 properties := &hcsschema.Properties{} 349 if err := json.Unmarshal([]byte(propertiesJSON), properties); err != nil { 350 return nil, makeSystemError(computeSystem, operation, err, nil) 351 } 352 353 return properties, nil 354} 355 356// Pause pauses the execution of the computeSystem. This feature is not enabled in TP5. 357func (computeSystem *System) Pause(ctx context.Context) (err error) { 358 operation := "hcs::System::Pause" 359 360 // hcsPauseComputeSystemContext is an async peration. Start the outer span 361 // here to measure the full pause time. 362 ctx, span := trace.StartSpan(ctx, operation) 363 defer span.End() 364 defer func() { oc.SetSpanStatus(span, err) }() 365 span.AddAttributes(trace.StringAttribute("cid", computeSystem.id)) 366 367 computeSystem.handleLock.RLock() 368 defer computeSystem.handleLock.RUnlock() 369 370 if computeSystem.handle == 0 { 371 return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 372 } 373 374 resultJSON, err := vmcompute.HcsPauseComputeSystem(ctx, computeSystem.handle, "") 375 events, err := processAsyncHcsResult(ctx, err, resultJSON, computeSystem.callbackNumber, hcsNotificationSystemPauseCompleted, &timeout.SystemPause) 376 if err != nil { 377 return makeSystemError(computeSystem, operation, err, events) 378 } 379 380 return nil 381} 382 383// Resume resumes the execution of the computeSystem. This feature is not enabled in TP5. 384func (computeSystem *System) Resume(ctx context.Context) (err error) { 385 operation := "hcs::System::Resume" 386 387 // hcsResumeComputeSystemContext is an async operation. Start the outer span 388 // here to measure the full restore time. 389 ctx, span := trace.StartSpan(ctx, operation) 390 defer span.End() 391 defer func() { oc.SetSpanStatus(span, err) }() 392 span.AddAttributes(trace.StringAttribute("cid", computeSystem.id)) 393 394 computeSystem.handleLock.RLock() 395 defer computeSystem.handleLock.RUnlock() 396 397 if computeSystem.handle == 0 { 398 return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 399 } 400 401 resultJSON, err := vmcompute.HcsResumeComputeSystem(ctx, computeSystem.handle, "") 402 events, err := processAsyncHcsResult(ctx, err, resultJSON, computeSystem.callbackNumber, hcsNotificationSystemResumeCompleted, &timeout.SystemResume) 403 if err != nil { 404 return makeSystemError(computeSystem, operation, err, events) 405 } 406 407 return nil 408} 409 410// Save the compute system 411func (computeSystem *System) Save(ctx context.Context, options interface{}) (err error) { 412 operation := "hcs::System::Save" 413 414 // hcsSaveComputeSystemContext is an async peration. Start the outer span 415 // here to measure the full save time. 416 ctx, span := trace.StartSpan(ctx, operation) 417 defer span.End() 418 defer func() { oc.SetSpanStatus(span, err) }() 419 span.AddAttributes(trace.StringAttribute("cid", computeSystem.id)) 420 421 saveOptions, err := json.Marshal(options) 422 if err != nil { 423 return err 424 } 425 426 computeSystem.handleLock.RLock() 427 defer computeSystem.handleLock.RUnlock() 428 429 if computeSystem.handle == 0 { 430 return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 431 } 432 433 result, err := vmcompute.HcsSaveComputeSystem(ctx, computeSystem.handle, string(saveOptions)) 434 events, err := processAsyncHcsResult(ctx, err, result, computeSystem.callbackNumber, hcsNotificationSystemSaveCompleted, &timeout.SystemSave) 435 if err != nil { 436 return makeSystemError(computeSystem, operation, err, events) 437 } 438 439 return nil 440} 441 442func (computeSystem *System) createProcess(ctx context.Context, operation string, c interface{}) (*Process, *vmcompute.HcsProcessInformation, error) { 443 computeSystem.handleLock.RLock() 444 defer computeSystem.handleLock.RUnlock() 445 446 if computeSystem.handle == 0 { 447 return nil, nil, makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 448 } 449 450 configurationb, err := json.Marshal(c) 451 if err != nil { 452 return nil, nil, makeSystemError(computeSystem, operation, err, nil) 453 } 454 455 configuration := string(configurationb) 456 processInfo, processHandle, resultJSON, err := vmcompute.HcsCreateProcess(ctx, computeSystem.handle, configuration) 457 events := processHcsResult(ctx, resultJSON) 458 if err != nil { 459 return nil, nil, makeSystemError(computeSystem, operation, err, events) 460 } 461 462 log.G(ctx).WithField("pid", processInfo.ProcessId).Debug("created process pid") 463 return newProcess(processHandle, int(processInfo.ProcessId), computeSystem), &processInfo, nil 464} 465 466// CreateProcess launches a new process within the computeSystem. 467func (computeSystem *System) CreateProcess(ctx context.Context, c interface{}) (cow.Process, error) { 468 operation := "hcs::System::CreateProcess" 469 process, processInfo, err := computeSystem.createProcess(ctx, operation, c) 470 if err != nil { 471 return nil, err 472 } 473 defer func() { 474 if err != nil { 475 process.Close() 476 } 477 }() 478 479 pipes, err := makeOpenFiles([]syscall.Handle{processInfo.StdInput, processInfo.StdOutput, processInfo.StdError}) 480 if err != nil { 481 return nil, makeSystemError(computeSystem, operation, err, nil) 482 } 483 process.stdin = pipes[0] 484 process.stdout = pipes[1] 485 process.stderr = pipes[2] 486 process.hasCachedStdio = true 487 488 if err = process.registerCallback(ctx); err != nil { 489 return nil, makeSystemError(computeSystem, operation, err, nil) 490 } 491 go process.waitBackground() 492 493 return process, nil 494} 495 496// OpenProcess gets an interface to an existing process within the computeSystem. 497func (computeSystem *System) OpenProcess(ctx context.Context, pid int) (*Process, error) { 498 computeSystem.handleLock.RLock() 499 defer computeSystem.handleLock.RUnlock() 500 501 operation := "hcs::System::OpenProcess" 502 503 if computeSystem.handle == 0 { 504 return nil, makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 505 } 506 507 processHandle, resultJSON, err := vmcompute.HcsOpenProcess(ctx, computeSystem.handle, uint32(pid)) 508 events := processHcsResult(ctx, resultJSON) 509 if err != nil { 510 return nil, makeSystemError(computeSystem, operation, err, events) 511 } 512 513 process := newProcess(processHandle, pid, computeSystem) 514 if err = process.registerCallback(ctx); err != nil { 515 return nil, makeSystemError(computeSystem, operation, err, nil) 516 } 517 go process.waitBackground() 518 519 return process, nil 520} 521 522// Close cleans up any state associated with the compute system but does not terminate or wait for it. 523func (computeSystem *System) Close() (err error) { 524 operation := "hcs::System::Close" 525 ctx, span := trace.StartSpan(context.Background(), operation) 526 defer span.End() 527 defer func() { oc.SetSpanStatus(span, err) }() 528 span.AddAttributes(trace.StringAttribute("cid", computeSystem.id)) 529 530 computeSystem.handleLock.Lock() 531 defer computeSystem.handleLock.Unlock() 532 533 // Don't double free this 534 if computeSystem.handle == 0 { 535 return nil 536 } 537 538 if err = computeSystem.unregisterCallback(ctx); err != nil { 539 return makeSystemError(computeSystem, operation, err, nil) 540 } 541 542 err = vmcompute.HcsCloseComputeSystem(ctx, computeSystem.handle) 543 if err != nil { 544 return makeSystemError(computeSystem, operation, err, nil) 545 } 546 547 computeSystem.handle = 0 548 computeSystem.closedWaitOnce.Do(func() { 549 computeSystem.waitError = ErrAlreadyClosed 550 close(computeSystem.waitBlock) 551 }) 552 553 return nil 554} 555 556func (computeSystem *System) registerCallback(ctx context.Context) error { 557 callbackContext := ¬ificationWatcherContext{ 558 channels: newSystemChannels(), 559 systemID: computeSystem.id, 560 } 561 562 callbackMapLock.Lock() 563 callbackNumber := nextCallback 564 nextCallback++ 565 callbackMap[callbackNumber] = callbackContext 566 callbackMapLock.Unlock() 567 568 callbackHandle, err := vmcompute.HcsRegisterComputeSystemCallback(ctx, computeSystem.handle, notificationWatcherCallback, callbackNumber) 569 if err != nil { 570 return err 571 } 572 callbackContext.handle = callbackHandle 573 computeSystem.callbackNumber = callbackNumber 574 575 return nil 576} 577 578func (computeSystem *System) unregisterCallback(ctx context.Context) error { 579 callbackNumber := computeSystem.callbackNumber 580 581 callbackMapLock.RLock() 582 callbackContext := callbackMap[callbackNumber] 583 callbackMapLock.RUnlock() 584 585 if callbackContext == nil { 586 return nil 587 } 588 589 handle := callbackContext.handle 590 591 if handle == 0 { 592 return nil 593 } 594 595 // hcsUnregisterComputeSystemCallback has its own syncronization 596 // to wait for all callbacks to complete. We must NOT hold the callbackMapLock. 597 err := vmcompute.HcsUnregisterComputeSystemCallback(ctx, handle) 598 if err != nil { 599 return err 600 } 601 602 closeChannels(callbackContext.channels) 603 604 callbackMapLock.Lock() 605 delete(callbackMap, callbackNumber) 606 callbackMapLock.Unlock() 607 608 handle = 0 //nolint:ineffassign 609 610 return nil 611} 612 613// Modify the System by sending a request to HCS 614func (computeSystem *System) Modify(ctx context.Context, config interface{}) error { 615 computeSystem.handleLock.RLock() 616 defer computeSystem.handleLock.RUnlock() 617 618 operation := "hcs::System::Modify" 619 620 if computeSystem.handle == 0 { 621 return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil) 622 } 623 624 requestBytes, err := json.Marshal(config) 625 if err != nil { 626 return err 627 } 628 629 requestJSON := string(requestBytes) 630 resultJSON, err := vmcompute.HcsModifyComputeSystem(ctx, computeSystem.handle, requestJSON) 631 events := processHcsResult(ctx, resultJSON) 632 if err != nil { 633 return makeSystemError(computeSystem, operation, err, events) 634 } 635 636 return nil 637} 638