1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package stats
18
19import (
20	"fmt"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	v1 "k8s.io/api/core/v1"
26	"k8s.io/apimachinery/pkg/util/wait"
27	utilfeature "k8s.io/apiserver/pkg/util/feature"
28	"k8s.io/client-go/tools/record"
29	"k8s.io/klog/v2"
30	stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
31	"k8s.io/kubernetes/pkg/features"
32	"k8s.io/kubernetes/pkg/volume"
33	"k8s.io/kubernetes/pkg/volume/util"
34)
35
36// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
37type volumeStatCalculator struct {
38	statsProvider Provider
39	jitterPeriod  time.Duration
40	pod           *v1.Pod
41	stopChannel   chan struct{}
42	startO        sync.Once
43	stopO         sync.Once
44	latest        atomic.Value
45	eventRecorder record.EventRecorder
46}
47
48// PodVolumeStats encapsulates the VolumeStats for a pod.
49// It consists of two lists, for local ephemeral volumes, and for persistent volumes respectively.
50type PodVolumeStats struct {
51	EphemeralVolumes  []stats.VolumeStats
52	PersistentVolumes []stats.VolumeStats
53}
54
55// newVolumeStatCalculator creates a new VolumeStatCalculator
56func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod, eventRecorder record.EventRecorder) *volumeStatCalculator {
57	return &volumeStatCalculator{
58		statsProvider: statsProvider,
59		jitterPeriod:  jitterPeriod,
60		pod:           pod,
61		stopChannel:   make(chan struct{}),
62		eventRecorder: eventRecorder,
63	}
64}
65
66// StartOnce starts pod volume calc that will occur periodically in the background until s.StopOnce is called
67func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
68	s.startO.Do(func() {
69		go wait.JitterUntil(func() {
70			s.calcAndStoreStats()
71		}, s.jitterPeriod, 1.0, true, s.stopChannel)
72	})
73	return s
74}
75
76// StopOnce stops background pod volume calculation.  Will not stop a currently executing calculations until
77// they complete their current iteration.
78func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator {
79	s.stopO.Do(func() {
80		close(s.stopChannel)
81	})
82	return s
83}
84
85// getLatest returns the most recent PodVolumeStats from the cache
86func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) {
87	result := s.latest.Load()
88	if result == nil {
89		return PodVolumeStats{}, false
90	}
91	return result.(PodVolumeStats), true
92}
93
94// calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache.
95// If the pod references PVCs, the prometheus metrics for those are updated with the result.
96func (s *volumeStatCalculator) calcAndStoreStats() {
97	// Find all Volumes for the Pod
98	volumes, found := s.statsProvider.ListVolumesForPod(s.pod.UID)
99	blockVolumes, bvFound := s.statsProvider.ListBlockVolumesForPod(s.pod.UID)
100	if !found && !bvFound {
101		return
102	}
103
104	metricVolumes := make(map[string]volume.MetricsProvider)
105
106	if found {
107		for name, v := range volumes {
108			metricVolumes[name] = v
109		}
110	}
111	if bvFound {
112		for name, v := range blockVolumes {
113			// Only add the blockVolume if it implements the MetricsProvider interface
114			if _, ok := v.(volume.MetricsProvider); ok {
115				// Some drivers inherit the MetricsProvider interface from Filesystem
116				// mode volumes, but do not implement it for Block mode. Checking
117				// SupportsMetrics() will prevent panics in that case.
118				if v.SupportsMetrics() {
119					metricVolumes[name] = v
120				}
121			}
122		}
123	}
124
125	// Get volume specs for the pod - key'd by volume name
126	volumesSpec := make(map[string]v1.Volume)
127	for _, v := range s.pod.Spec.Volumes {
128		volumesSpec[v.Name] = v
129	}
130
131	// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
132	var ephemeralStats []stats.VolumeStats
133	var persistentStats []stats.VolumeStats
134	for name, v := range metricVolumes {
135		metric, err := v.GetMetrics()
136		if err != nil {
137			// Expected for Volumes that don't support Metrics
138			if !volume.IsNotSupported(err) {
139				klog.V(4).InfoS("Failed to calculate volume metrics", "pod", klog.KObj(s.pod), "podUID", s.pod.UID, "volumeName", name, "err", err)
140			}
141			continue
142		}
143		// Lookup the volume spec and add a 'PVCReference' for volumes that reference a PVC
144		volSpec := volumesSpec[name]
145		var pvcRef *stats.PVCReference
146		if pvcSource := volSpec.PersistentVolumeClaim; pvcSource != nil {
147			pvcRef = &stats.PVCReference{
148				Name:      pvcSource.ClaimName,
149				Namespace: s.pod.GetNamespace(),
150			}
151		}
152		volumeStats := s.parsePodVolumeStats(name, pvcRef, metric, volSpec)
153		if util.IsLocalEphemeralVolume(volSpec) {
154			ephemeralStats = append(ephemeralStats, volumeStats)
155		} else {
156			persistentStats = append(persistentStats, volumeStats)
157		}
158
159		if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
160			if metric.Abnormal != nil && metric.Message != nil && (*metric.Abnormal) {
161				s.eventRecorder.Event(s.pod, v1.EventTypeWarning, "VolumeConditionAbnormal", fmt.Sprintf("Volume %s: %s", name, *metric.Message))
162			}
163		}
164	}
165
166	// Store the new stats
167	s.latest.Store(PodVolumeStats{EphemeralVolumes: ephemeralStats,
168		PersistentVolumes: persistentStats})
169}
170
171// parsePodVolumeStats converts (internal) volume.Metrics to (external) stats.VolumeStats structures
172func (s *volumeStatCalculator) parsePodVolumeStats(podName string, pvcRef *stats.PVCReference, metric *volume.Metrics, volSpec v1.Volume) stats.VolumeStats {
173
174	var available, capacity, used, inodes, inodesFree, inodesUsed uint64
175	if metric.Available != nil {
176		available = uint64(metric.Available.Value())
177	}
178	if metric.Capacity != nil {
179		capacity = uint64(metric.Capacity.Value())
180	}
181	if metric.Used != nil {
182		used = uint64(metric.Used.Value())
183	}
184	if metric.Inodes != nil {
185		inodes = uint64(metric.Inodes.Value())
186	}
187	if metric.InodesFree != nil {
188		inodesFree = uint64(metric.InodesFree.Value())
189	}
190	if metric.InodesUsed != nil {
191		inodesUsed = uint64(metric.InodesUsed.Value())
192	}
193
194	return stats.VolumeStats{
195		Name:   podName,
196		PVCRef: pvcRef,
197		FsStats: stats.FsStats{Time: metric.Time, AvailableBytes: &available, CapacityBytes: &capacity,
198			UsedBytes: &used, Inodes: &inodes, InodesFree: &inodesFree, InodesUsed: &inodesUsed},
199	}
200}
201