Fix 'the entire path of the pod is not cleaned up after deleting it' on sylixos

This commit is contained in:
wanglan 2024-10-11 11:09:42 +08:00
parent 0b26f239ff
commit 3cff2e172d
17 changed files with 272 additions and 84 deletions

View File

@ -30,11 +30,13 @@ import (
"net/http"
"os"
"path/filepath"
"reflect"
goruntime "runtime"
"strconv"
"strings"
"time"
"github.com/coreos/go-systemd/v22/daemon"
jsonpatch "github.com/evanphx/json-patch"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@ -309,8 +311,6 @@ is checked every 20 seconds (also configurable with a flag).`,
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
})
// make pid
return cmd
}
@ -493,7 +493,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
// not be generated.
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
klog.InfoStdf("GOMAXPROCS will reset to 1. older GOMAXPROCS=%d NumCPU=%d", goruntime.GOMAXPROCS(-1), goruntime.NumCPU())
goruntime.GOMAXPROCS(1)
//goruntime.GOMAXPROCS(1)
// To help debugging, immediately log version
klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
@ -537,6 +537,7 @@ func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
fmt.Printf("Start makeEventRecorder %v\n", kubeDeps.Recorder == nil)
if kubeDeps.Recorder != nil {
return
}
@ -544,11 +545,12 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName)
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil {
klog.V(4).InfoS("Sending events to api server")
klog.V(4).InfoStdS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
klog.InfoS("No api server defined - no events will be sent to API server")
}
fmt.Printf("End makeEventRecorder %v %v\n", kubeDeps.Recorder == nil, reflect.TypeOf(kubeDeps.Recorder)) // *record.recorderImplLogger
}
func getReservedCPUs(machineInfo *cadvisorapi.MachineInfo, cpus string) (cpuset.CPUSet, error) {
@ -878,10 +880,10 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
// TODO(vmarmol): Do this through container config.
//oomAdjuster := kubeDeps.OOMAdjuster
//if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
// klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
//}
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
@ -904,7 +906,9 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
// If systemd is used, notify it that we have started
//go daemon.SdNotify(false, "READY=1") // github.com/coreos/go-systemd/v22/daemon
if goruntime.GOOS != "sylixos" {
go daemon.SdNotify(false, "READY=1") // github.com/coreos/go-systemd/v22/daemon
}
select {
case <-done:
@ -1308,7 +1312,7 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele
}
// start the podresources grpc service
//go k.ListenAndServePodResources()
go k.ListenAndServePodResources()
}
func createAndInitKubelet(kubeServer *options.KubeletServer,

View File

@ -308,7 +308,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource)) // 开始监听并调用 listen -> merge
}
return cfg, nil
}
@ -328,7 +328,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}
// 如果容器统计信息由 cadvisor 而不是通过 CRI 提供,则 UsingLegacyCadvisorStats 返回 true
//kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(kubeCfg.ContainerRuntimeEndpoint)
return nil
@ -393,7 +393,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Attempting to sync node with API server") // sylixos下没走这一行
klog.InfoS("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
@ -470,7 +470,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
Namespace: "",
}
oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder) // TODO 可能需要适配
oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
if err != nil {
if libcontaineruserns.RunningInUserNS() {
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) {
@ -1440,13 +1440,16 @@ func (kl *Kubelet) setupDataDirs() error {
// StartGarbageCollection starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
klog.V(4).InfoS("Starting kubelet garbage collector")
klog.V(4).InfoStdS("Starting kubelet garbage collector")
loggedContainerGCFailure := false
go wait.Until(func() {
klog.V(3).InfoStdf("Starting container garbage collection... %v", loggedContainerGCFailure)
ctx := context.Background()
if err := kl.containerGC.GarbageCollect(ctx); err != nil {
klog.ErrorS(err, "Container garbage collection failed")
klog.ErrorStdS(err, "Container garbage collection failed and loggedContainerGCFailure setting to true")
klog.V(2).InfoStdf("Start recorder event for container garbage collection failed")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
klog.V(2).InfoStdf("End recorder event for container garbage collection failed")
loggedContainerGCFailure = true
} else {
var vLevel klog.Level = 4
@ -1455,9 +1458,10 @@ func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure = false
}
klog.V(vLevel).InfoS("Container garbage collection succeeded")
klog.V(vLevel).InfoStdS("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
klog.V(3).InfoStdf("End container garbage collection %v", loggedContainerGCFailure)
}, ContainerGCPeriod, wait.NeverStop) // ContainerGCPeriod
// when the high threshold is set to 100, stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
@ -1467,14 +1471,17 @@ func (kl *Kubelet) StartGarbageCollection() {
prevImageGCFailed := false
go wait.Until(func() {
klog.V(3).InfoStdf("Starting image garbage collection... %v", prevImageGCFailed)
ctx := context.Background()
if err := kl.imageManager.GarbageCollect(ctx); err != nil {
if prevImageGCFailed {
klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
klog.ErrorStdS(err, "Image garbage collection failed multiple times in a row")
// Only create an event for repeated failures
klog.V(2).InfoStdf("Start recorder event for image garbage collection failed")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
klog.V(2).InfoStdf("End recorder event for image garbage collection failed")
} else {
klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
klog.ErrorStdS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
}
prevImageGCFailed = true
} else {
@ -1484,8 +1491,9 @@ func (kl *Kubelet) StartGarbageCollection() {
prevImageGCFailed = false
}
klog.V(vLevel).InfoS("Image garbage collection succeeded")
klog.V(vLevel).InfoStdS("Image garbage collection succeeded")
}
klog.V(3).InfoStdf("End image garbage collection %v", prevImageGCFailed)
}, ImageGCPeriod, wait.NeverStop)
}
@ -1596,6 +1604,8 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
klog.ErrorS(err, "Failed to start node shutdown manager")
}
klog.InfoStdln("End initialize runtime dependent modules")
}
// Run starts the kubelet reacting to config updates
@ -1657,7 +1667,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
// Start volume manager
klog.InfoStdln("Starting volume manager ", kl.sourcesReady)
klog.InfoStdf("Starting volume manager %+v", kl.sourcesReady)
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
@ -1670,13 +1680,14 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(context.Background())
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // 原 5s
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
@ -1687,7 +1698,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
}
klog.Infoln("Start run statusManager")
klog.InfoStdln("Start run statusManager")
// Start component sync loops.
kl.statusManager.Start()
@ -1758,7 +1769,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
fmt.Printf("start SyncPod...updateType=%v, pod.Name=%s, podStatus=%+v\n", updateType, pod.Name, *podStatus)
klog.V(5).InfoStdf("Kubelet SyncPod 1 podName=%s podName=%s SandboxStatusesLength=%d ContainerStatusesLength=%d", podStatus.Name, pod.Name, len(podStatus.SandboxStatuses), len(podStatus.ContainerStatuses))
ctx, otelSpan := kl.tracer.Start(ctx, "syncPod", trace.WithAttributes(
semconv.K8SPodUIDKey.String(string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
@ -1795,6 +1806,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
klog.V(5).InfoStdf("Kubelet SyncPod 2 podName=%s SandboxStatusesLength=%d ContainerStatusesLength=%d", podStatus.Name, len(podStatus.SandboxStatuses), len(podStatus.ContainerStatuses))
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
@ -1847,7 +1859,8 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
klog.V(5).InfoStdf("Kubelet SyncPod 3 podName=%s SandboxStatusesLength=%d ContainerStatusesLength=%d runnable.Admit=%v",
podStatus.Name, len(podStatus.SandboxStatuses), len(podStatus.ContainerStatuses), runnable.Admit)
// Pods that are not runnable must be stopped - return a typed error to the pod worker
if !runnable.Admit {
klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
@ -1916,6 +1929,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
}
}
klog.V(5).InfoStdf("Kubelet SyncPod 2 podName=%s SandboxStatusesLength=%d ContainerStatusesLength=%d", podStatus.Name, len(podStatus.SandboxStatuses), len(podStatus.ContainerStatuses))
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
@ -1968,7 +1982,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
if err := kl.makePodDataDirs(pod); err != nil { // 创建pod路径下面的子路径volumes、plugins等
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return false, err
@ -2042,6 +2056,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned
// pods.
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
klog.V(2).InfoStdf("SyncTerminatingPod running %s", pod.Name)
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatingPod", trace.WithAttributes(
@ -2051,24 +2066,27 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
defer otelSpan.End()
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(4).InfoStdS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoStdS("SyncTerminatingPod exit", "podName", pod.Name, "podUID", pod.UID)
//defer klog.V(4).InfoStdS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
if podStatusFn != nil {
podStatusFn(&apiPodStatus)
}
klog.V(2).InfoStdf("SetPodStatus-1 podName=%s, podUID=%s, apiPodStatus=%+v; gracePeriod is %v", pod.Name, pod.UID, apiPodStatus, gracePeriod == nil)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
if gracePeriod != nil {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
klog.V(4).InfoStdS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
} else {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
klog.V(4).InfoStdS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
}
kl.probeManager.StopLivenessAndStartup(pod)
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
klog.V(2).InfoStdf("Killing pod %s", pod.Name)
if err := kl.killPod(ctx, pod, p, gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
@ -2080,6 +2098,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
// TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after
// the detection of a container shutdown or (for readiness) after the first failure. Tracked as
// https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing.
klog.V(2).InfoStdf("Removing Pod %s", pod.Name)
kl.probeManager.RemovePod(pod)
// Guard against consistency issues in KillPod implementations by checking that there are no
@ -2112,7 +2131,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
}
if klogVEnabled {
sort.Slice(containers, func(i, j int) bool { return containers[i].Name < containers[j].Name })
klog.V(4).InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers)
klog.V(4).InfoStdS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers)
}
if len(runningContainers) > 0 {
return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
@ -2132,10 +2151,11 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
// information about the container end states (including exit codes) - when
// SyncTerminatedPod is called the containers may already be removed.
apiPodStatus = kl.generateAPIPodStatus(pod, podStatus, true)
klog.V(2).InfoStdf("SetPodStatus-2 podName=%s, podUID=%s, apiPodStatus=%+v", pod.Name, pod.UID, apiPodStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(4).InfoStdS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
@ -2200,7 +2220,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(4).InfoStdS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
if !kl.keepTerminatedPodVolumes {
// This waiting loop relies on the background cleanup which starts after pod workers respond
@ -2208,13 +2228,13 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
volumesExist := kl.podVolumesExist(pod.UID)
if volumesExist {
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(3).InfoStdS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
}
return !volumesExist, nil
}); err != nil {
return err
}
klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(3).InfoStdS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
}
// After volume unmount is complete, let the secret and configmap managers know we're done with this pod
@ -2236,14 +2256,14 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
if err := pcm.Destroy(name); err != nil {
return err
}
klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(4).InfoStdS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
}
kl.usernsManager.Release(pod.UID)
// mark the final pod status
kl.statusManager.TerminatePod(pod)
klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(4).InfoStdS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
@ -2369,18 +2389,18 @@ func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
klog.InfoStdS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second * 2) // TODO 原1s
syncTicker := time.NewTicker(time.Second * 1)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
base = 100 * time.Millisecond // 原 100 * time.Millisecond
max = 5 * time.Second // 原 5 * time.Second
factor = 2
)
duration := base
@ -2393,7 +2413,7 @@ func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpd
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
klog.ErrorStdS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
@ -2401,12 +2421,13 @@ func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpd
}
// reset backoff if we have a success
duration = base
//fmt.Println("Ready to syncLoopIteration...")
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
//fmt.Println("syncLoop once end.")
}
}
@ -2444,13 +2465,13 @@ func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpd
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
//fmt.Println("syncLoopIteration exec ...")
klog.V(7).InfoStdln("syncLoopIteration exec ...")
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
klog.ErrorStdS(nil, "Update channel is closed, exiting the sync loop")
return false
}
@ -2532,21 +2553,23 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
klog.V(9).InfoStdf("SyncLoop (housekeeping) start")
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
klog.V(4).InfoStdln("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(ctx); err != nil {
klog.V(9).InfoStdln("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(ctx); err != nil { // 清理pod路径
klog.ErrorS(err, "Failed cleaning pods")
}
klog.V(9).InfoStdf("Cleanups Pod when housekeeping success")
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
klog.ErrorStdS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
}
klog.V(4).InfoS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
klog.V(9).InfoStdS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
}
}
return true
@ -2557,10 +2580,10 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
klog.V(4).InfoStdS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
return
}
klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
klog.V(1).InfoStdS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
@ -2912,29 +2935,32 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
// and returns an error if the status check fails. If the status check is OK,
// update the container runtime uptime in the kubelet runtimeState.
func (kl *Kubelet) updateRuntimeUp() {
klog.InfoStdln("Updating container runtime status")
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
ctx := context.Background()
klog.InfoStdf("Start updating container runtime status")
ctx := context.Background()
s, err := kl.containerRuntime.Status(ctx)
if err != nil {
klog.ErrorS(err, "Container runtime sanity check failed")
klog.ErrorStdf("Container runtime sanity check failed: %v", err)
return
}
if s == nil {
klog.ErrorS(nil, "Container runtime status is nil")
klog.ErrorStdS(nil, "Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
klog.V(4).InfoS("Container runtime status", "status", s)
klog.V(2).InfoStdf("Container runtime status is {%v}", s.String())
klogErrorS := klog.ErrorS
if !kl.containerRuntimeReadyExpected {
klogErrorS = klog.V(4).ErrorS
klogErrorS = klog.V(4).ErrorStdS
}
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
klog.V(2).InfoStdS("Getting runtime network condition", "containerRuntimeReadyExpected", kl.containerRuntimeReadyExpected, "networkReady", networkReady)
if networkReady == nil || !networkReady.Status {
klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady, "networkReady.Status", networkReady.Status)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
@ -2942,6 +2968,7 @@ func (kl *Kubelet) updateRuntimeUp() {
}
// information in RuntimeReady condition will be propagated to NodeReady condition.
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
klog.V(2).InfoStdS("Getting runtime condition", "runtimeReady", runtimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
klogErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
@ -2949,8 +2976,12 @@ func (kl *Kubelet) updateRuntimeUp() {
return
}
kl.runtimeState.setRuntimeState(nil)
klog.V(2).InfoStdln("Set runtime and network condition end")
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
klog.InfoStdln("End updating container runtime status")
}
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
@ -2961,6 +2992,7 @@ func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
klog.V(2).InfoStdf("Recording event for kubelet restarted")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
@ -2982,7 +3014,7 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
// ListenAndServePodResources runs the kubelet podresources grpc service
func (kl *Kubelet) ListenAndServePodResources() {
klog.Infof("kubelet.ListenAndServePodResources start listen %s", kl.getPodResourcesDir())
klog.InfoStdf("kubelet.ListenAndServePodResources start listen %s", kl.getPodResourcesDir())
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
fmt.Printf("Kubelet LocalEndpoint failed %v\n", err)
@ -2991,8 +3023,7 @@ func (kl *Kubelet) ListenAndServePodResources() {
}
//endpoint := "unix:///media/nvme1/var/lib/kubelet/pod-resources/kubelet.sock" // modify the path temporarily
fmt.Printf("kubelet.ListenAndServePodResources serving %s; GOOS=%s\n", endpoint, sysruntime.GOOS) // unix:/var/lib/kubelet/pod-resources/kubelet.sock
klog.Infof("kubelet.ListenAndServePodResources serving %s; GOOS=%s", endpoint, sysruntime.GOOS)
klog.InfoStdf("kubelet.ListenAndServePodResources serving %s; GOOS=%s", endpoint, sysruntime.GOOS) // unix:/var/lib/kubelet/pod-resources/kubelet.sock
providers := podresources.PodResourcesProviders{
Pods: kl.podManager,
@ -3032,7 +3063,7 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
if kl.fastNodeStatusUpdate(ctx, kl.clock.Since(start) >= nodeReadyGracePeriod) {
close(stopCh)
}
}, 200*time.Millisecond, stopCh) // set 100 -> 200 temporary
}, 100*time.Millisecond, stopCh) // 原 100*time.Millisecond
}
// CheckpointContainer tries to checkpoint a container. The parameters are used to

View File

@ -1035,6 +1035,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
pcm := kl.containerManager.NewPodContainerManager()
cgroupPods, err = pcm.GetAllPodsFromCgroups()
if err != nil {
klog.V(3).InfoStdf("failed to get pods from cgroups: %v", err)
return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)
}
}
@ -1053,7 +1054,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// These two conditions could be alleviated by checkpointing kubelet.
// Stop the workers for terminated pods not in the config source
klog.V(9).InfoS("Clean up pod workers for terminated pods")
klog.V(9).InfoStdf("Clean up pod workers for terminated pods allPods=%d, mirrorPods=%d, orphanedMirrorPodFullnames=%d", len(allPods), len(mirrorPods), len(orphanedMirrorPodFullnames))
workingPods := kl.podWorkers.SyncKnownPods(allPods)
// Reconcile: At this point the pod workers have been pruned to the set of
@ -1062,6 +1063,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
allPodsByUID := make(map[types.UID]*v1.Pod)
for _, pod := range allPods {
//klog.V(9).InfoStdf("Looking allPods [%d] podName=%s, uid=%s", i+1, pod.Name, pod.UID)
allPodsByUID[pod.UID] = pod
}
@ -1085,6 +1087,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
possiblyRunningPods[uid] = struct{}{}
case TerminatingPod:
possiblyRunningPods[uid] = struct{}{}
klog.V(9).InfoStdf("\tLooking TerminatingPod uid=%s", uid)
default:
}
}

View File

@ -123,6 +123,7 @@ func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error {
// If there are still volume directories, attempt to rmdir them
volumePaths, err := kl.getPodVolumePathListFromDisk(uid)
klog.V(4).Infof("removeOrphanedPodVolumeDirs %v for uid=%s", volumePaths, uid)
if err != nil {
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error occurred during reading volume dir from disk: %v", uid, err))
return orphanVolumeErrors
@ -157,7 +158,8 @@ func (kl *Kubelet) removeOrphanedPodVolumeDirs(uid types.UID) []error {
// Remove any remaining subdirectories along with the volumes directory itself.
// Fail if any regular files are encountered.
podVolDir := kl.getPodVolumesDir(uid)
if err := removeall.RemoveDirsOneFilesystem(kl.mounter, podVolDir); err != nil {
klog.V(4).Infof("removeOrphanedPodVolumeDirs podVolDir %s", podVolDir)
if err := removeall.RemoveDirsOneFilesystem(kl.mounter, podVolDir); err != nil { // 清理pod下的volumes路径
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error occurred when trying to remove the volumes dir: %v", uid, err))
} else {
klog.InfoS("Cleaned up orphaned pod volumes dir", "podUID", uid, "path", podVolDir)
@ -230,6 +232,7 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
for _, podSubdir := range podSubdirs {
podSubdirName := podSubdir.Name()
podSubdirPath := filepath.Join(podDir, podSubdirName)
klog.V(4).InfoS("Removing pod subdir", "podUID", uid, "podSubdirPath", podSubdirPath, "podSubdir", podSubdir)
// Never attempt RemoveAllOneFilesystem on the volumes directory,
// as this could lead to data loss in some situations. The volumes
// directory should have been removed by removeOrphanedPodVolumeDirs.
@ -243,12 +246,14 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
cleanupFailed = true
klog.ErrorS(err, "Failed to remove orphaned pod subdir", "podUID", uid, "path", podSubdirPath)
orphanRemovalErrors = append(orphanRemovalErrors, fmt.Errorf("orphaned pod %q found, but error occurred when trying to remove subdir %q: %v", uid, podSubdirPath, err))
} else {
klog.InfoS("Cleaned up orphaned pod subdir success", "podUID", uid, "path", podSubdirPath)
}
}
// Rmdir the pod dir, which should be empty if everything above was successful
klog.V(3).InfoS("Orphaned pod found, removing", "podUID", uid)
if err := syscall.Rmdir(podDir); err != nil {
klog.V(3).InfoS("Orphaned pod found, removing", "podUID", uid, "podDir", podDir)
if err := syscall.Rmdir(podDir); err != nil { // 清理pod路径 /var/lib/kubelet/pods/568e990c-89b8-4340-a919-aab038bc7f9f
cleanupFailed = true
klog.ErrorS(err, "Failed to remove orphaned pod dir", "podUID", uid)
orphanRemovalErrors = append(orphanRemovalErrors, fmt.Errorf("orphaned pod %q found, but error occurred when trying to remove the pod directory: %v", uid, err))

View File

@ -1212,6 +1212,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
var lastSyncTime time.Time
for range podUpdates {
ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
klog.V(4).InfoS("Processing pod event", "podUID", podUID, "canStart", canStart, "canEverStart", canEverStart, "ok", ok)
// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
if !ok {
continue
@ -1227,7 +1228,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
podUID, podRef := podUIDAndRefForUpdate(update.Options)
klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
klog.V(4).InfoS("Processing pod event", "podRef", podRef, "podUID", podUID, "updateType", update.WorkType)
var isTerminal bool
err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate
@ -1251,6 +1252,11 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
// container's status is garbage collected before we have a chance to update the
// API server (thus losing the exit code).
status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
klog.V(5).InfoStdf("podWorkerLoop via cache lastSyncTime=%s now=%s\nSandboxStatusesLength=%d ContainerStatusesLength=%d\nstatus=%+v",
lastSyncTime.String(), time.Now().String(), len(status.SandboxStatuses), len(status.ContainerStatuses), status)
for i, sandboxStatus := range status.SandboxStatuses {
klog.V(3).InfoStdf("Looking SandboxStatuses[%d]=%+v", i, sandboxStatus)
}
if err != nil {
// This is the legacy event thrown by manage pod loop all other events are now dispatched
@ -1280,13 +1286,15 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
}
default:
klog.V(5).InfoStdf("Default podWorkerLoop pod %s SandboxStatusesLength=%d ContainerStatusesLength=%d", update.Options.Pod.Name, len(status.SandboxStatuses), len(status.ContainerStatuses))
isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
}
lastSyncTime = p.clock.Now()
klog.V(2).InfoStdf("Syncing podWorkerLoop lastSyncTime=%s now=%s", lastSyncTime.String(), time.Now().String())
return err
}()
klog.V(2).InfoStdS("Syncing pod end", "pod", podRef, "podUID", podUID, "err", err)
var phaseTransition bool
switch {
case err == context.Canceled:
@ -1295,7 +1303,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
case err != nil:
// we will queue a retry
klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)
klog.ErrorStdS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)
case update.WorkType == TerminatedPod:
// we can shut down the worker

View File

@ -123,16 +123,17 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura
}
if kl.isPodRunning(pod, status) {
klog.InfoS("Pod's containers running", "pod", klog.KObj(pod))
klog.InfoStdS("Pod's containers running", "pod", klog.KObj(pod))
return nil
}
klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
klog.InfoStdS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
klog.InfoStdS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(2).InfoStdf("Kubelet runPod start SyncPod SandboxStatuses length=%d", len(status.SandboxStatuses))
if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
}
@ -140,7 +141,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries)
}
// TODO(proppy): health checking would be better than waiting + checking the state at the next iteration.
klog.InfoS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay)
klog.InfoStdS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay)
time.Sleep(delay) // default 1s
retry++
delay *= runOnceRetryDelayBackoff

View File

@ -339,6 +339,7 @@ type configMapVolumeUnmounter struct {
var _ volume.Unmounter = &configMapVolumeUnmounter{}
func (c *configMapVolumeUnmounter) TearDown() error {
klog.V(3).Infof("TearDown configMapVolume podUID=%s, volume=%s", c.podUID, c.volName)
return c.TearDownAt(c.GetPath())
}

View File

@ -494,17 +494,22 @@ func (ed *emptyDir) GetPath() string {
// TearDown simply discards everything in the directory.
func (ed *emptyDir) TearDown() error {
klog.V(3).Infof("TearDown emptyDir podUID=%s, volume=%s", ed.pod.UID, ed.volName)
return ed.TearDownAt(ed.GetPath())
}
// TearDownAt simply discards everything in the directory.
func (ed *emptyDir) TearDownAt(dir string) error {
func (ed *emptyDir) TearDownAt(dir string) error { // /var/lib/kubelet/pods/049b01bc-b5ec-4e0a-9c48-91fd0e16c420/volumes/kubernetes.io~projected/kube-api-access-jzg5v
// First remove ready dir which created in SetUp func
readyDir := ed.getMetaDir()
readyDir := ed.getMetaDir() // /var/lib/kubelet/pods/049b01bc-b5ec-4e0a-9c48-91fd0e16c420/plugins/kubernetes.io~empty-dir/wrapped_kube-api-access-jzg5v
klog.V(4).Infof("TearDownAt emptyDir readyDir=%s, dir=%s", readyDir, dir)
if removeErr := os.RemoveAll(readyDir); removeErr != nil && !os.IsNotExist(removeErr) {
return fmt.Errorf("failed to remove ready dir [%s]: %v", readyDir, removeErr)
}
klog.V(4).Infof("TearDownAt Removed readyDir %s", readyDir)
if pathExists, pathErr := mount.PathExists(dir); pathErr != nil {
return fmt.Errorf("error checking if path exists: %w", pathErr)
} else if !pathExists {
@ -514,6 +519,7 @@ func (ed *emptyDir) TearDownAt(dir string) error {
// Figure out the medium.
medium, isMnt, _, err := ed.mountDetector.GetMountMedium(dir, ed.medium)
klog.V(4).Infof("TearDownAt GetMountMedium volName=%s, medium=%v, isMnt=%v, err=%v", ed.volName, medium, isMnt, err)
if err != nil {
return err
}
@ -540,17 +546,21 @@ func (ed *emptyDir) teardownDefault(dir string) error {
}
// Renaming the directory is not required anymore because the operation executor
// now handles duplicate operations on the same volume
klog.V(4).Infof("Removing directory when teardownDefault %s", dir)
return os.RemoveAll(dir)
}
func (ed *emptyDir) teardownTmpfsOrHugetlbfs(dir string) error {
klog.V(4).InfoStdf("Unmounting tmpfs or hugetlbfs volume at %s", dir)
if ed.mounter == nil {
return fmt.Errorf("memory storage requested, but mounter is nil")
}
if err := ed.mounter.Unmount(dir); err != nil {
klog.Errorf("Failed to unmount tmpfs or hugetlbfs volume at %s: %v", dir, err)
return err
}
if err := os.RemoveAll(dir); err != nil {
klog.Errorf("Failed to remove tmpfs or hugetlbfs volume at %s: %v", dir, err)
return err
}
return nil

View File

@ -18,6 +18,7 @@ package projected
import (
"fmt"
"reflect"
authenticationv1 "k8s.io/api/authentication/v1"
v1 "k8s.io/api/core/v1"
@ -404,20 +405,24 @@ type projectedVolumeUnmounter struct {
var _ volume.Unmounter = &projectedVolumeUnmounter{}
func (c *projectedVolumeUnmounter) TearDown() error {
klog.V(3).Infof("TearDown projectedVolume podUID=%s, volume=%s", c.podUID, c.volName)
return c.TearDownAt(c.GetPath())
}
func (c *projectedVolumeUnmounter) TearDownAt(dir string) error {
klog.V(3).Infof("Tearing down volume %v for pod %v at %v", c.volName, c.podUID, dir)
klog.V(3).Infof("Tearing down volume %s for pod %s at %s", c.volName, c.podUID, dir)
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec(), c.podUID)
klog.Errorf("TearDownAt projectedVolume %s, err: %v; wrapped type: %v", c.volName, err, reflect.TypeOf(wrapped))
if err != nil {
return err
}
if err = wrapped.TearDownAt(dir); err != nil {
klog.Errorf("Error tearing down projectedVolume %s: %s", c.volName, err)
return err
}
klog.V(3).Infof("TearDownAt success for volume=%s, pod=%s, dir=%s", c.volName, c.podUID, dir)
c.plugin.deleteServiceAccountToken(c.podUID)
return nil
}

View File

@ -321,6 +321,7 @@ type secretVolumeUnmounter struct {
var _ volume.Unmounter = &secretVolumeUnmounter{}
func (c *secretVolumeUnmounter) TearDown() error {
klog.V(3).Infof("TearDown secretVolume podUID=%s, volume=%s", c.podUID, c.volName)
return c.TearDownAt(c.GetPath())
}

View File

@ -150,6 +150,8 @@ func (grm *nestedPendingOperations) Run(
opKey := operationKey{volumeName, podName, nodeName}
opExists, previousOpIndex := grm.isOperationExists(opKey)
klog.V(3).InfoStdf("nestedPendingOperations.Run: volumeName=%s, podName=%s, opExists=%v, previousOpIndex=%d, operationName=%s",
volumeName, podName, opExists, previousOpIndex, generatedOperations.OperationName) // vc13
if opExists {
previousOp := grm.operations[previousOpIndex]
// Operation already exists

View File

@ -975,6 +975,7 @@ func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
podsDir string) error {
klog.InfoStdf("operationExecutor.UnmountVolume start to unmount volume %s ...", volumeToUnmount.VolumeName)
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
if err != nil {
return err
@ -992,6 +993,7 @@ func (oe *operationExecutor) UnmountVolume(
volumeToUnmount, actualStateOfWorld)
}
if err != nil {
klog.Errorf("UnmountVolume failed for volume %s : %v", volumeToUnmount.VolumeName, err)
return err
}
// All volume plugins can execute unmount/unmap for multiple pods referencing the

View File

@ -581,7 +581,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
klog.V(3).InfoStdf("mountVolumeFunc start checkMountOptionSupport. volume %s; volumePluginMgr type: %v; volumePlugin type: %v; volumeMounter type: %v",
volumeToMount.VolumeName, reflect.TypeOf(og.volumePluginMgr), reflect.TypeOf(volumePlugin), reflect.TypeOf(volumeMounter)) // vc
// volumePluginMgr type: *volume.VolumePluginMgr; volumePlugin type: *secret.secretPlugin; volumeMounter type: *secret.secretVolumeMounter
mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
if mountCheckError != nil {
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
@ -869,6 +869,8 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
podsDir string) (volumetypes.GeneratedOperations, error) {
klog.V(4).InfoS("GenerateUnmountVolumeFunc called", "volumeName", volumeToUnmount.VolumeName, "podsDir", podsDir, "podUID", volumeToUnmount.PodUID)
// Get mountable plugin
volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName)
if err != nil || volumePlugin == nil {
@ -877,21 +879,26 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter(
volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
if newUnmounterErr != nil {
klog.V(4).InfoS("GenerateUnmountVolumeFunc NewUnmounter failed", "volumeName", volumeToUnmount.VolumeName, "podUID", volumeToUnmount.PodUID, "err", err)
return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
}
unmountVolumeFunc := func() volumetypes.OperationContext {
klog.V(4).InfoS("Start run unmountVolumeFunc", "volumeName", volumeToUnmount.VolumeName)
subpather := og.volumePluginMgr.Host.GetSubpather()
migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
// Remove all bind-mounts for subPaths
podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
klog.V(4).InfoS("CleanSubPaths calling", "podDir", podDir, "volumeName", volumeToUnmount.VolumeName)
if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.V(4).InfoS("Start TearDown volume...", "volumeName", volumeToUnmount.VolumeName)
// Execute unmount
unmountErr := volumeUnmounter.TearDown()
if unmountErr != nil {
@ -934,6 +941,7 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error())
}
klog.V(4).InfoS("End run unmountVolumeFunc", "volumeName", volumeToUnmount.VolumeName)
return volumetypes.NewOperationContext(nil, nil, migrated)
}
klog.V(3).InfoStdf("GeneratedOperationGenerator GenerateUnmountVolumeFunc making GeneratedOperations...")

View File

@ -326,12 +326,15 @@ func doCleanSubPath(mounter mount.Interface, fullContainerDirPath, subPathIndex
// cleanSubPath will teardown the subpath bind mount and any remove any directories if empty
func cleanSubPath(mounter mount.Interface, subpath Subpath) error {
containerDir := filepath.Join(subpath.PodDir, containerSubPathDirectoryName, subpath.VolumeName, subpath.ContainerName)
klog.V(4).InfoStdf("Cleaning up subpath %s", containerDir)
// Clean subdir bindmount
if err := doCleanSubPath(mounter, containerDir, strconv.Itoa(subpath.VolumeMountIndex)); err != nil && !os.IsNotExist(err) {
return err
}
klog.V(4).InfoStdf("Removing subpath %s", subpath.PodDir)
// Recusively remove directories if empty
if err := removeEmptyDirs(subpath.PodDir, containerDir); err != nil {
return err
@ -343,6 +346,7 @@ func cleanSubPath(mounter mount.Interface, subpath Subpath) error {
// removeEmptyDirs works backwards from endDir to baseDir and removes each directory
// if it is empty. It stops once it encounters a directory that has content
func removeEmptyDirs(baseDir, endDir string) error {
klog.V(4).Infof("Removing empty dirs from %s to %s", baseDir, endDir)
if !mount.PathWithinBase(endDir, baseDir) {
return fmt.Errorf("endDir %q is not within baseDir %q", endDir, baseDir)
}
@ -360,6 +364,7 @@ func removeEmptyDirs(baseDir, endDir string) error {
return fmt.Errorf("path %q not a directory", curDir)
}
klog.V(5).Infof("Checking dir is exist, removing %s", curDir)
err = os.Remove(curDir)
if os.IsExist(err) {
klog.V(5).Infof("Directory %q not empty, not removing", curDir)

View File

@ -21,10 +21,15 @@ package subpath
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"k8s.io/mount-utils"
"k8s.io/utils/nsenter"
"k8s.io/klog/v2"
)
/*
@ -40,7 +45,9 @@ TODO these attr in current unix/syscall package not support:
Recheck this file after updating the new package.
*/
type subpath struct{}
type subpath struct {
mounter mount.Interface
}
var errUnsupported = errors.New("util/subpath on this platform is not supported")
@ -59,10 +66,101 @@ func (sp *subpath) PrepareSafeSubpath(subPath Subpath) (newHostPath string, clea
return subPath.Path, nil, nil
}
func (sp *subpath) CleanSubPaths(podDir string, volumeName string) error {
return errUnsupported
}
func (sp *subpath) SafeMakeDir(pathname string, base string, perm os.FileMode) error {
return errUnsupported
}
func (sp *subpath) CleanSubPaths(podDir string, volumeName string) error {
return doCleanSubPaths(sp.mounter, podDir, volumeName)
}
// doCleanSubPath tears down the single subpath bind mount
func doCleanSubPath(_ mount.Interface, fullContainerDirPath, subPathIndex string) error {
// process /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/<subPathName>
klog.V(4).Infof("Cleaning up subpath mounts for subpath=%s, fullContainerDirPath=%s", subPathIndex, fullContainerDirPath)
fullSubPath := filepath.Join(fullContainerDirPath, subPathIndex)
if err := os.RemoveAll(fullSubPath); err != nil {
return fmt.Errorf("error cleaning subpath on sylix %s: %s", fullSubPath, err)
}
klog.V(4).Infof("Successfully cleaned subpath directory %s", fullSubPath)
return nil
}
const (
// place for subpath mounts
// TODO: pass in directory using kubelet_getters instead
containerSubPathDirectoryName = "volume-subpaths"
)
func doCleanSubPaths(mounter mount.Interface, podDir string, volumeName string) error {
// scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/*
subPathDir := filepath.Join(podDir, containerSubPathDirectoryName, volumeName)
klog.V(4).Infof("Cleaning up subpaths mounts for %s, podDir=%s, volumeName=%s", subPathDir, podDir, volumeName)
containerDirs, err := ioutil.ReadDir(subPathDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("error reading %s: %s", subPathDir, err)
}
for _, containerDir := range containerDirs {
if !containerDir.IsDir() {
klog.V(4).Infof("Container file is not a directory: %s", containerDir.Name())
continue
}
klog.V(4).Infof("Cleaning dir for container %s, containerDir=%s", containerDir.Name(), containerDir)
// scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/*
fullContainerDirPath := filepath.Join(subPathDir, containerDir.Name())
// The original traversal method here was ReadDir, which was not so robust to handle some error such as "stale NFS file handle",
// so it was replaced with filepath.Walk in a later patch, which can pass through error and handled by the callback WalkFunc.
// After go 1.16, WalkDir was introduced, it's more effective than Walk because the callback WalkDirFunc is called before
// reading a directory, making it save some time when a container's subPath contains lots of dirs.
// See https://github.com/kubernetes/kubernetes/pull/71804 and https://github.com/kubernetes/kubernetes/issues/107667 for more details.
err = filepath.WalkDir(fullContainerDirPath, func(path string, info os.DirEntry, _ error) error {
if path == fullContainerDirPath {
// Skip top level directory
return nil
}
// pass through errors and let doCleanSubPath handle them
if err = doCleanSubPath(mounter, fullContainerDirPath, filepath.Base(path)); err != nil {
return err
}
// We need to check that info is not nil. This may happen when the incoming err is not nil due to stale mounts or permission errors.
if info != nil && info.IsDir() {
// skip subdirs of the volume: it only matters the first level to unmount, otherwise it would try to unmount subdir of the volume
return filepath.SkipDir
}
return nil
})
if err != nil {
return fmt.Errorf("error processing %s: %s", fullContainerDirPath, err)
}
// Whole container has been processed, remove its directory.
if err := os.Remove(fullContainerDirPath); err != nil {
return fmt.Errorf("error deleting %s: %s", fullContainerDirPath, err)
}
klog.V(5).Infof("Removed fullContainerDirPath %s from %s", fullContainerDirPath, podDir)
}
// Whole pod volume subpaths have been cleaned up, remove its subpath directory.
if err := os.Remove(subPathDir); err != nil {
return fmt.Errorf("error deleting %s: %s", subPathDir, err)
}
klog.V(5).Infof("Removed subPathDir %s for podDir %s, volumeName=%s", subPathDir, podDir, volumeName)
// Remove entire subpath directory if it's the last one
podSubPathDir := filepath.Join(podDir, containerSubPathDirectoryName)
if err := os.Remove(podSubPathDir); err != nil && !os.IsExist(err) {
return fmt.Errorf("error deleting %s: %s", podSubPathDir, err)
}
klog.V(5).Infof("Removed podSubPathDir %s", podSubPathDir)
return nil
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
)
@ -62,6 +63,7 @@ type CompleteFuncParam struct {
// Run executes the operations and its supporting functions
func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
klog.V(3).InfoStdf("GeneratedOperations running all OperationFunc...") // vc14
var context OperationContext
if o.CompleteFunc != nil {
c := CompleteFuncParam{
@ -70,6 +72,7 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
}
defer o.CompleteFunc(c)
}
klog.V(2).InfoStdf("GeneratedOperations running operation EventRecorderFunc and OperationFunc")
if o.EventRecorderFunc != nil {
defer o.EventRecorderFunc(&eventErr)
}

View File

@ -236,6 +236,7 @@ func UnmountViaEmptyDir(dir string, host volume.VolumeHost, volName string, volS
// Wrap EmptyDir, let it do the teardown.
wrapped, err := host.NewWrapperUnmounter(volName, volSpec, podUID)
klog.Infof("TearDownAt ViaEmptyDir %s, err: %v; wrapped type: %v", volName, err, reflect.TypeOf(wrapped))
if err != nil {
return err
}