1package main
2
3import (
4	"context"
5	"runtime"
6	"strings"
7	"sync"
8	"sync/atomic"
9	"time"
10
11	"github.com/Microsoft/hcsshim/internal/oc"
12	"github.com/Microsoft/hcsshim/internal/shimdiag"
13	"github.com/containerd/containerd/errdefs"
14	"github.com/containerd/containerd/runtime/v2/task"
15	google_protobuf1 "github.com/gogo/protobuf/types"
16	"go.opencensus.io/trace"
17)
18
19type cdevent struct {
20	topic string
21	event interface{}
22}
23
24var _ = (task.TaskService)(&service{})
25
26type service struct {
27	events publisher
28	// tid is the original task id to be served. This can either be a single
29	// task or represent the POD sandbox task id. The first call to Create MUST
30	// match this id or the shim is considered to be invalid.
31	//
32	// This MUST be treated as readonly for the lifetime of the shim.
33	tid string
34	// isSandbox specifies if `tid` is a POD sandbox. If `false` the shim will
35	// reject all calls to `Create` where `tid` does not match. If `true`
36	// multiple calls to `Create` are allowed as long as the workload containers
37	// all have the same parent task id.
38	//
39	// This MUST be treated as readonly for the lifetime of the shim.
40	isSandbox bool
41
42	// taskOrPod is either the `pod` this shim is tracking if `isSandbox ==
43	// true` or it is the `task` this shim is tracking. If no call to `Create`
44	// has taken place yet `taskOrPod.Load()` MUST return `nil`.
45	taskOrPod atomic.Value
46
47	// cl is the create lock. Since each shim MUST only track a single task or
48	// POD. `cl` is used to create the task or POD sandbox. It SHOULD not be
49	// taken when creating tasks in a POD sandbox as they can happen
50	// concurrently.
51	cl sync.Mutex
52}
53
54func (s *service) State(ctx context.Context, req *task.StateRequest) (resp *task.StateResponse, err error) {
55	defer panicRecover()
56	ctx, span := trace.StartSpan(ctx, "State")
57	defer span.End()
58	defer func() {
59		if resp != nil {
60			span.AddAttributes(
61				trace.StringAttribute("status", resp.Status.String()),
62				trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
63				trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
64		}
65		oc.SetSpanStatus(span, err)
66	}()
67
68	span.AddAttributes(
69		trace.StringAttribute("tid", req.ID),
70		trace.StringAttribute("eid", req.ExecID))
71
72	if s.isSandbox {
73		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
74	}
75
76	r, e := s.stateInternal(ctx, req)
77	return r, errdefs.ToGRPC(e)
78}
79
80func (s *service) Create(ctx context.Context, req *task.CreateTaskRequest) (resp *task.CreateTaskResponse, err error) {
81	defer panicRecover()
82	ctx, span := trace.StartSpan(ctx, "Create")
83	defer span.End()
84	defer func() {
85		if resp != nil {
86			span.AddAttributes(trace.Int64Attribute("pid", int64(resp.Pid)))
87		}
88		oc.SetSpanStatus(span, err)
89	}()
90
91	span.AddAttributes(
92		trace.StringAttribute("tid", req.ID),
93		trace.StringAttribute("bundle", req.Bundle),
94		// trace.StringAttribute("rootfs", req.Rootfs), TODO: JTERRY75 -
95		// OpenCensus doesnt support slice like our logrus hook
96		trace.BoolAttribute("terminal", req.Terminal),
97		trace.StringAttribute("stdin", req.Stdin),
98		trace.StringAttribute("stdout", req.Stdout),
99		trace.StringAttribute("stderr", req.Stderr),
100		trace.StringAttribute("checkpoint", req.Checkpoint),
101		trace.StringAttribute("parentcheckpoint", req.ParentCheckpoint))
102
103	if s.isSandbox {
104		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
105	}
106
107	r, e := s.createInternal(ctx, req)
108	return r, errdefs.ToGRPC(e)
109}
110
111func (s *service) Start(ctx context.Context, req *task.StartRequest) (resp *task.StartResponse, err error) {
112	defer panicRecover()
113	ctx, span := trace.StartSpan(ctx, "Start")
114	defer span.End()
115	defer func() {
116		if resp != nil {
117			span.AddAttributes(trace.Int64Attribute("pid", int64(resp.Pid)))
118		}
119		oc.SetSpanStatus(span, err)
120	}()
121
122	span.AddAttributes(
123		trace.StringAttribute("tid", req.ID),
124		trace.StringAttribute("eid", req.ExecID))
125
126	if s.isSandbox {
127		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
128	}
129
130	r, e := s.startInternal(ctx, req)
131	return r, errdefs.ToGRPC(e)
132}
133
134func (s *service) Delete(ctx context.Context, req *task.DeleteRequest) (resp *task.DeleteResponse, err error) {
135	defer panicRecover()
136	ctx, span := trace.StartSpan(ctx, "Delete")
137	defer span.End()
138	defer func() {
139		if resp != nil {
140			span.AddAttributes(
141				trace.Int64Attribute("pid", int64(resp.Pid)),
142				trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
143				trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
144		}
145		oc.SetSpanStatus(span, err)
146	}()
147
148	span.AddAttributes(
149		trace.StringAttribute("tid", req.ID),
150		trace.StringAttribute("eid", req.ExecID))
151
152	if s.isSandbox {
153		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
154	}
155
156	r, e := s.deleteInternal(ctx, req)
157	return r, errdefs.ToGRPC(e)
158}
159
160func (s *service) Pids(ctx context.Context, req *task.PidsRequest) (_ *task.PidsResponse, err error) {
161	defer panicRecover()
162	ctx, span := trace.StartSpan(ctx, "Pids")
163	defer span.End()
164	defer func() { oc.SetSpanStatus(span, err) }()
165
166	span.AddAttributes(trace.StringAttribute("tid", req.ID))
167
168	if s.isSandbox {
169		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
170	}
171
172	r, e := s.pidsInternal(ctx, req)
173	return r, errdefs.ToGRPC(e)
174}
175
176func (s *service) Pause(ctx context.Context, req *task.PauseRequest) (_ *google_protobuf1.Empty, err error) {
177	defer panicRecover()
178	ctx, span := trace.StartSpan(ctx, "Pause")
179	defer span.End()
180	defer func() { oc.SetSpanStatus(span, err) }()
181
182	span.AddAttributes(trace.StringAttribute("tid", req.ID))
183
184	if s.isSandbox {
185		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
186	}
187
188	r, e := s.pauseInternal(ctx, req)
189	return r, errdefs.ToGRPC(e)
190}
191
192func (s *service) Resume(ctx context.Context, req *task.ResumeRequest) (_ *google_protobuf1.Empty, err error) {
193	defer panicRecover()
194	ctx, span := trace.StartSpan(ctx, "Resume")
195	defer span.End()
196	defer func() { oc.SetSpanStatus(span, err) }()
197
198	span.AddAttributes(trace.StringAttribute("tid", req.ID))
199
200	if s.isSandbox {
201		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
202	}
203
204	r, e := s.resumeInternal(ctx, req)
205	return r, errdefs.ToGRPC(e)
206}
207
208func (s *service) Checkpoint(ctx context.Context, req *task.CheckpointTaskRequest) (_ *google_protobuf1.Empty, err error) {
209	defer panicRecover()
210	ctx, span := trace.StartSpan(ctx, "Checkpoint")
211	defer span.End()
212	defer func() { oc.SetSpanStatus(span, err) }()
213
214	span.AddAttributes(
215		trace.StringAttribute("tid", req.ID),
216		trace.StringAttribute("path", req.Path))
217
218	if s.isSandbox {
219		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
220	}
221
222	r, e := s.checkpointInternal(ctx, req)
223	return r, errdefs.ToGRPC(e)
224}
225
226func (s *service) Kill(ctx context.Context, req *task.KillRequest) (_ *google_protobuf1.Empty, err error) {
227	defer panicRecover()
228	ctx, span := trace.StartSpan(ctx, "Kill")
229	defer span.End()
230	defer func() { oc.SetSpanStatus(span, err) }()
231
232	span.AddAttributes(
233		trace.StringAttribute("tid", req.ID),
234		trace.StringAttribute("eid", req.ExecID),
235		trace.Int64Attribute("signal", int64(req.Signal)),
236		trace.BoolAttribute("all", req.All))
237
238	if s.isSandbox {
239		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
240	}
241
242	r, e := s.killInternal(ctx, req)
243	return r, errdefs.ToGRPC(e)
244}
245
246func (s *service) Exec(ctx context.Context, req *task.ExecProcessRequest) (_ *google_protobuf1.Empty, err error) {
247	defer panicRecover()
248	ctx, span := trace.StartSpan(ctx, "Exec")
249	defer span.End()
250	defer func() { oc.SetSpanStatus(span, err) }()
251
252	span.AddAttributes(
253		trace.StringAttribute("tid", req.ID),
254		trace.StringAttribute("eid", req.ExecID),
255		trace.BoolAttribute("terminal", req.Terminal),
256		trace.StringAttribute("stdin", req.Stdin),
257		trace.StringAttribute("stdout", req.Stdout),
258		trace.StringAttribute("stderr", req.Stderr))
259
260	if s.isSandbox {
261		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
262	}
263
264	r, e := s.execInternal(ctx, req)
265	return r, errdefs.ToGRPC(e)
266}
267
268func (s *service) DiagExecInHost(ctx context.Context, req *shimdiag.ExecProcessRequest) (_ *shimdiag.ExecProcessResponse, err error) {
269	defer panicRecover()
270	ctx, span := trace.StartSpan(ctx, "DiagExecInHost")
271	defer span.End()
272	defer func() { oc.SetSpanStatus(span, err) }()
273
274	span.AddAttributes(
275		trace.StringAttribute("args", strings.Join(req.Args, " ")),
276		trace.StringAttribute("workdir", req.Workdir),
277		trace.BoolAttribute("terminal", req.Terminal),
278		trace.StringAttribute("stdin", req.Stdin),
279		trace.StringAttribute("stdout", req.Stdout),
280		trace.StringAttribute("stderr", req.Stderr))
281
282	if s.isSandbox {
283		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
284	}
285
286	r, e := s.diagExecInHostInternal(ctx, req)
287	return r, errdefs.ToGRPC(e)
288}
289
290func (s *service) DiagShare(ctx context.Context, req *shimdiag.ShareRequest) (_ *shimdiag.ShareResponse, err error) {
291	defer panicRecover()
292	ctx, span := trace.StartSpan(ctx, "DiagShare")
293	defer span.End()
294	defer func() { oc.SetSpanStatus(span, err) }()
295
296	span.AddAttributes(
297		trace.StringAttribute("hostpath", req.HostPath),
298		trace.StringAttribute("uvmpath", req.UvmPath),
299		trace.BoolAttribute("readonly", req.ReadOnly))
300
301	if s.isSandbox {
302		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
303	}
304
305	r, e := s.diagShareInternal(ctx, req)
306	return r, errdefs.ToGRPC(e)
307}
308
309func (s *service) ResizePty(ctx context.Context, req *task.ResizePtyRequest) (_ *google_protobuf1.Empty, err error) {
310	defer panicRecover()
311	ctx, span := trace.StartSpan(ctx, "ResizePty")
312	defer span.End()
313	defer func() { oc.SetSpanStatus(span, err) }()
314
315	span.AddAttributes(
316		trace.StringAttribute("tid", req.ID),
317		trace.StringAttribute("eid", req.ExecID),
318		trace.Int64Attribute("width", int64(req.Width)),
319		trace.Int64Attribute("height", int64(req.Height)))
320
321	if s.isSandbox {
322		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
323	}
324
325	r, e := s.resizePtyInternal(ctx, req)
326	return r, errdefs.ToGRPC(e)
327}
328
329func (s *service) CloseIO(ctx context.Context, req *task.CloseIORequest) (_ *google_protobuf1.Empty, err error) {
330	defer panicRecover()
331	ctx, span := trace.StartSpan(ctx, "CloseIO")
332	defer span.End()
333	defer func() { oc.SetSpanStatus(span, err) }()
334
335	span.AddAttributes(
336		trace.StringAttribute("tid", req.ID),
337		trace.StringAttribute("eid", req.ExecID),
338		trace.BoolAttribute("stdin", req.Stdin))
339
340	if s.isSandbox {
341		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
342	}
343
344	r, e := s.closeIOInternal(ctx, req)
345	return r, errdefs.ToGRPC(e)
346}
347
348func (s *service) Update(ctx context.Context, req *task.UpdateTaskRequest) (_ *google_protobuf1.Empty, err error) {
349	defer panicRecover()
350	ctx, span := trace.StartSpan(ctx, "Update")
351	defer span.End()
352	defer func() { oc.SetSpanStatus(span, err) }()
353
354	span.AddAttributes(trace.StringAttribute("tid", req.ID))
355
356	if s.isSandbox {
357		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
358	}
359
360	r, e := s.updateInternal(ctx, req)
361	return r, errdefs.ToGRPC(e)
362}
363
364func (s *service) Wait(ctx context.Context, req *task.WaitRequest) (resp *task.WaitResponse, err error) {
365	defer panicRecover()
366	ctx, span := trace.StartSpan(ctx, "Wait")
367	defer span.End()
368	defer func() {
369		if resp != nil {
370			span.AddAttributes(
371				trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
372				trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
373		}
374		oc.SetSpanStatus(span, err)
375	}()
376
377	span.AddAttributes(
378		trace.StringAttribute("tid", req.ID),
379		trace.StringAttribute("eid", req.ExecID))
380
381	if s.isSandbox {
382		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
383	}
384
385	r, e := s.waitInternal(ctx, req)
386	return r, errdefs.ToGRPC(e)
387}
388
389func (s *service) Stats(ctx context.Context, req *task.StatsRequest) (_ *task.StatsResponse, err error) {
390	defer panicRecover()
391	ctx, span := trace.StartSpan(ctx, "Stats")
392	defer span.End()
393	defer func() { oc.SetSpanStatus(span, err) }()
394
395	span.AddAttributes(trace.StringAttribute("tid", req.ID))
396
397	if s.isSandbox {
398		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
399	}
400
401	r, e := s.statsInternal(ctx, req)
402	return r, errdefs.ToGRPC(e)
403}
404
405func (s *service) Connect(ctx context.Context, req *task.ConnectRequest) (resp *task.ConnectResponse, err error) {
406	defer panicRecover()
407	ctx, span := trace.StartSpan(ctx, "Connect")
408	defer span.End()
409	defer func() {
410		if resp != nil {
411			span.AddAttributes(
412				trace.Int64Attribute("shimPid", int64(resp.ShimPid)),
413				trace.Int64Attribute("taskPid", int64(resp.TaskPid)),
414				trace.StringAttribute("version", resp.Version))
415		}
416		oc.SetSpanStatus(span, err)
417	}()
418
419	span.AddAttributes(trace.StringAttribute("tid", req.ID))
420
421	if s.isSandbox {
422		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
423	}
424
425	r, e := s.connectInternal(ctx, req)
426	return r, errdefs.ToGRPC(e)
427}
428
429func (s *service) Shutdown(ctx context.Context, req *task.ShutdownRequest) (_ *google_protobuf1.Empty, err error) {
430	defer panicRecover()
431	ctx, span := trace.StartSpan(ctx, "Shutdown")
432	defer span.End()
433	defer func() { oc.SetSpanStatus(span, err) }()
434
435	span.AddAttributes(trace.StringAttribute("tid", req.ID))
436
437	if s.isSandbox {
438		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
439	}
440
441	r, e := s.shutdownInternal(ctx, req)
442	return r, errdefs.ToGRPC(e)
443}
444
445func (s *service) DiagStacks(ctx context.Context, req *shimdiag.StacksRequest) (*shimdiag.StacksResponse, error) {
446	if s == nil {
447		return nil, nil
448	}
449	defer panicRecover()
450	ctx, span := trace.StartSpan(ctx, "DiagStacks")
451	defer span.End()
452
453	span.AddAttributes(trace.StringAttribute("tid", s.tid))
454
455	if s.isSandbox {
456		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
457	}
458
459	buf := make([]byte, 4096)
460	for {
461		buf = buf[:runtime.Stack(buf, true)]
462		if len(buf) < cap(buf) {
463			break
464		}
465		buf = make([]byte, 2*len(buf))
466	}
467	resp := &shimdiag.StacksResponse{Stacks: string(buf)}
468
469	t, _ := s.getTask(s.tid)
470	if t != nil {
471		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
472		defer cancel()
473		resp.GuestStacks = t.DumpGuestStacks(ctx)
474	}
475	return resp, nil
476}
477