Revision 373d2cac328804a094206179e18feb4a1fd6aaae authored by Martynas Pumputis on 06 December 2021, 15:18:14 UTC, committed by Martynas Pumputis on 07 December 2021, 14:33:25 UTC
Signed-off-by: Martynas Pumputis <m@lambda.lt>
1 parent 37c3499
k8s_cep_gc.go
// SPDX-License-Identifier: Apache-2.0
// Copyright 2016-2020 Authors of Cilium
package main
import (
"context"
"time"
operatorOption "github.com/cilium/cilium/operator/option"
"github.com/cilium/cilium/operator/watchers"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// enableCiliumEndpointSyncGC starts the node-singleton sweeper for
// CiliumEndpoint objects where the managing node is no longer running. These
// objects are created by the sync-to-k8s-ciliumendpoint controller on each
// Endpoint.
// The general steps are:
// - list all CEPs in the cluster
// - for each CEP
// delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC(once bool) {
var (
controllerName = "to-k8s-ciliumendpoint-gc"
scopedLog = log.WithField("controller", controllerName)
gcInterval time.Duration
stopCh = make(chan struct{})
)
ciliumClient := ciliumK8sClient.CiliumV2()
if once {
log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources")
gcInterval = 0
} else {
log.Info("Starting to garbage collect stale CiliumEndpoint custom resources")
gcInterval = operatorOption.Config.EndpointGCInterval
}
// This functions will block until the resources are synced with k8s.
watchers.CiliumEndpointsInit(ciliumClient, stopCh)
if !once {
// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state.
watchers.PodsInit(k8s.WatcherClient(), stopCh)
}
<-k8sCiliumNodesCacheSynced
// this dummy manager is needed only to add this controller to the global list
controller.NewManager().UpdateController(controllerName,
controller.ControllerParams{
RunInterval: gcInterval,
DoFunc: func(ctx context.Context) error {
return doCiliumEndpointSyncGC(ctx, once, stopCh, scopedLog)
},
})
}
func doCiliumEndpointSyncGC(ctx context.Context, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error {
ciliumClient := ciliumK8sClient.CiliumV2()
// For each CEP we fetched, check if we know about it
for _, cepObj := range watchers.CiliumEndpointStore.List() {
cep, ok := cepObj.(*cilium_v2.CiliumEndpoint)
if !ok {
log.WithField(logfields.Object, cepObj).
Errorf("Saw %T object while expecting *cilium_v2.CiliumEndpoint", cepObj)
continue
}
cepFullName := cep.Namespace + "/" + cep.Name
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.K8sPodName: cepFullName,
})
// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state therefore we won't even watch for the pod store.
if !once {
var podObj interface{}
var err error
exists := false
podChecked := false
for _, owner := range cep.ObjectMeta.OwnerReferences {
switch owner.Kind {
case "Pod":
podObj, exists, err = watchers.PodStore.GetByKey(cepFullName)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get pod from store")
}
podChecked = true
case "CiliumNode":
podObj, exists, err = ciliumNodeStore.GetByKey(owner.Name)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get CiliumNode from store")
}
}
// Stop looking when an existing owner has been found
if exists {
break
}
}
if !exists && !podChecked {
// Check for a Pod in case none of the owners existed
// This keeps the old behavior even if OwnerReferences are missing
podObj, exists, err = watchers.PodStore.GetByKey(cepFullName)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get pod from store")
}
}
if exists {
switch pod := podObj.(type) {
case *cilium_v2.CiliumNode:
continue
case *slim_corev1.Pod:
// In Kubernetes Jobs, Pods can be left in Kubernetes until the Job
// is deleted. If the Job is never deleted, Cilium will never receive a Pod
// delete event, causing the IP to be left in the ipcache.
// For this reason we should delete the ipcache entries whenever the pod
// status is either PodFailed or PodSucceeded as it means the IP address
// is no longer in use.
if k8sUtils.IsPodRunning(pod.Status) {
continue
}
default:
log.WithField(logfields.Object, podObj).
Errorf("Saw %T object while expecting *slim_corev1.Pod or *cilium_v2.CiliumNode", podObj)
continue
}
}
}
// FIXME: this is fragile as we might have received the
// CEP notification first but not the pod notification
// so we need to have a similar mechanism that we have
// for the keep alive of security identities.
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.EndpointID: cep.Status.ID,
})
scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(
ctx,
cep.Name,
meta_v1.DeleteOptions{PropagationPolicy: &PropagationPolicy})
if err != nil && !k8serrors.IsNotFound(err) {
scopedLog.WithError(err).Warning("Unable to delete orphaned CEP")
return err
}
}
// We have cleaned up all CEPs from Kubernetes so we can stop
// the k8s watchers.
if once {
close(stopCh)
}
return nil
}
Computing file changes ...